From b962942b0f27105d286594b187f994601603cbf6 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Wed, 7 Feb 2024 18:26:13 +0100 Subject: [PATCH] WIP AMQP Management --- .gitignore | 1 + deps/amqp10_client/BUILD.bazel | 2 +- deps/amqp10_client/Makefile | 2 +- deps/amqp10_client/src/amqp10_client.erl | 10 +- .../src/amqp10_client_session.erl | 74 ++- deps/amqp10_client/src/amqp10_msg.erl | 15 +- deps/amqp10_common/include/amqp10_types.hrl | 7 + deps/rabbit/app.bzl | 3 + deps/rabbit/include/rabbit_amqp.hrl | 3 - deps/rabbit/src/rabbit_amqp_management.erl | 163 ++++++ deps/rabbit/src/rabbit_amqp_reader.erl | 8 +- deps/rabbit/src/rabbit_amqp_session.erl | 541 ++++++++++++++---- deps/rabbit/src/rabbit_reader.erl | 2 +- deps/rabbit/test/amqp_client_SUITE.erl | 2 +- deps/rabbitmq_amqp_client/.gitignore | 17 + deps/rabbitmq_amqp_client/BUILD.bazel | 90 +++ deps/rabbitmq_amqp_client/LICENSE | 4 + .../rabbitmq_amqp_client/LICENSE-MPL-RabbitMQ | 373 ++++++++++++ deps/rabbitmq_amqp_client/Makefile | 22 + deps/rabbitmq_amqp_client/README.md | 22 + deps/rabbitmq_amqp_client/app.bzl | 67 +++ deps/rabbitmq_amqp_client/erlang.mk | 1 + .../rabbitmq-components.mk | 1 + .../src/rabbitmq_amqp_client.erl | 201 +++++++ .../test/management_SUITE.erl | 131 +++++ .../test/rabbit_mgmt_http_SUITE.erl | 2 +- moduleindex.yaml | 3 + 27 files changed, 1592 insertions(+), 175 deletions(-) create mode 100644 deps/rabbit/src/rabbit_amqp_management.erl create mode 100644 deps/rabbitmq_amqp_client/.gitignore create mode 100644 deps/rabbitmq_amqp_client/BUILD.bazel create mode 100644 deps/rabbitmq_amqp_client/LICENSE create mode 100644 deps/rabbitmq_amqp_client/LICENSE-MPL-RabbitMQ create mode 100644 deps/rabbitmq_amqp_client/Makefile create mode 100644 deps/rabbitmq_amqp_client/README.md create mode 100644 deps/rabbitmq_amqp_client/app.bzl create mode 120000 deps/rabbitmq_amqp_client/erlang.mk create mode 120000 deps/rabbitmq_amqp_client/rabbitmq-components.mk create mode 100644 deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl create mode 100644 deps/rabbitmq_amqp_client/test/management_SUITE.erl diff --git a/.gitignore b/.gitignore index 10c7e4b7a4b6..a0e7d87e0c53 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,7 @@ !/deps/amqp10_common/ !/deps/oauth2_client/ !/deps/rabbitmq_amqp1_0/ +!/deps/rabbitmq_amqp_client/ !/deps/rabbitmq_auth_backend_cache/ !/deps/rabbitmq_auth_backend_http/ !/deps/rabbitmq_auth_backend_ldap/ diff --git a/deps/amqp10_client/BUILD.bazel b/deps/amqp10_client/BUILD.bazel index 6d865915a811..df8b879adae1 100644 --- a/deps/amqp10_client/BUILD.bazel +++ b/deps/amqp10_client/BUILD.bazel @@ -20,7 +20,7 @@ load( APP_NAME = "amqp10_client" -APP_DESCRIPTION = "AMQP 1.0 client from the RabbitMQ Project" +APP_DESCRIPTION = "AMQP 1.0 client" APP_MODULE = "amqp10_client_app" diff --git a/deps/amqp10_client/Makefile b/deps/amqp10_client/Makefile index 466bde568804..6e6629bd7a11 100644 --- a/deps/amqp10_client/Makefile +++ b/deps/amqp10_client/Makefile @@ -1,5 +1,5 @@ PROJECT = amqp10_client -PROJECT_DESCRIPTION = AMQP 1.0 client from the RabbitMQ Project +PROJECT_DESCRIPTION = AMQP 1.0 client PROJECT_MOD = amqp10_client_app define PROJECT_APP_EXTRA_KEYS diff --git a/deps/amqp10_client/src/amqp10_client.erl b/deps/amqp10_client/src/amqp10_client.erl index 32f91a5f7aea..bf00b531cc4c 100644 --- a/deps/amqp10_client/src/amqp10_client.erl +++ b/deps/amqp10_client/src/amqp10_client.erl @@ -42,8 +42,6 @@ parse_uri/1 ]). --define(DEFAULT_TIMEOUT, 5000). - -type snd_settle_mode() :: amqp10_client_session:snd_settle_mode(). -type rcv_settle_mode() :: amqp10_client_session:rcv_settle_mode(). @@ -134,7 +132,7 @@ begin_session(Connection) when is_pid(Connection) -> -spec begin_session_sync(pid()) -> supervisor:startchild_ret() | session_timeout. begin_session_sync(Connection) when is_pid(Connection) -> - begin_session_sync(Connection, ?DEFAULT_TIMEOUT). + begin_session_sync(Connection, ?TIMEOUT). %% @doc Synchronously begins an amqp10 session using 'Connection'. %% This is a convenience function that awaits the 'begun' event @@ -191,7 +189,7 @@ attach_sender_link_sync(Session, Name, Target, SettleMode, Durability) -> {ok, Ref}; {amqp10_event, {link, Ref, {detached, Err}}} -> {error, Err} - after ?DEFAULT_TIMEOUT -> link_timeout + after ?TIMEOUT -> link_timeout end. %% @doc Attaches a sender link to a target. @@ -357,7 +355,7 @@ stop_receiver_link(#link_ref{role = receiver, send_msg(#link_ref{role = sender, session = Session, link_handle = Handle}, Msg0) -> Msg = amqp10_msg:set_handle(Handle, Msg0), - amqp10_client_session:transfer(Session, Msg, ?DEFAULT_TIMEOUT). + amqp10_client_session:transfer(Session, Msg, ?TIMEOUT). %% @doc Accept a message on a the link referred to be the 'LinkRef'. -spec accept_msg(link_ref(), amqp10_msg:amqp10_msg()) -> ok. @@ -376,7 +374,7 @@ settle_msg(LinkRef, Msg, Settlement) -> %% Flows a single link credit then awaits delivery or timeout. -spec get_msg(link_ref()) -> {ok, amqp10_msg:amqp10_msg()} | {error, timeout}. get_msg(LinkRef) -> - get_msg(LinkRef, ?DEFAULT_TIMEOUT). + get_msg(LinkRef, ?TIMEOUT). %% @doc Get a single message from a link. %% Flows a single link credit then awaits delivery or timeout. diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 7b1cba641d76..ffdcfd5837a6 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -52,7 +52,6 @@ diff/2]). -define(MAX_SESSION_WINDOW_SIZE, 65535). --define(DEFAULT_TIMEOUT, 5000). -define(UINT_OUTGOING_WINDOW, {uint, ?UINT_MAX}). -define(INITIAL_OUTGOING_DELIVERY_ID, ?UINT_MAX). %% "The next-outgoing-id MAY be initialized to an arbitrary value" [2.5.6] @@ -149,7 +148,7 @@ reader :: pid(), socket :: amqp10_client_connection:amqp10_socket() | undefined, links = #{} :: #{output_handle() => #link{}}, - link_index = #{} :: #{link_name() => output_handle()}, + link_index = #{} :: #{{link_role(), link_name()} => output_handle()}, link_handle_index = #{} :: #{input_handle() => output_handle()}, next_link_handle = 0 :: output_handle(), early_attach_requests :: [term()], @@ -172,7 +171,7 @@ -spec begin_sync(pid()) -> supervisor:startchild_ret(). begin_sync(Connection) -> - begin_sync(Connection, ?DEFAULT_TIMEOUT). + begin_sync(Connection, ?TIMEOUT). -spec begin_sync(pid(), non_neg_integer()) -> supervisor:startchild_ret() | session_timeout. @@ -302,24 +301,28 @@ mapped(cast, #'v1_0.end'{error = Err}, State) -> mapped(cast, #'v1_0.attach'{name = {utf8, Name}, initial_delivery_count = IDC, handle = {uint, InHandle}, + role = PeerRoleBool, max_message_size = MaybeMaxMessageSize}, #state{links = Links, link_index = LinkIndex, link_handle_index = LHI} = State0) -> - #{Name := OutHandle} = LinkIndex, + OurRoleBool = not PeerRoleBool, + OurRole = boolean_to_role(OurRoleBool), + LinkIndexKey = {OurRole, Name}, + #{LinkIndexKey := OutHandle} = LinkIndex, #{OutHandle := Link0} = Links, ok = notify_link_attached(Link0), {DeliveryCount, MaxMessageSize} = case Link0 of - #link{role = sender, + #link{role = sender = OurRole, delivery_count = DC} -> MSS = case MaybeMaxMessageSize of {ulong, S} when S > 0 -> S; _ -> undefined end, {DC, MSS}; - #link{role = receiver, + #link{role = receiver = OurRole, max_message_size = MSS} -> {unpack(IDC), MSS} end, @@ -327,8 +330,8 @@ mapped(cast, #'v1_0.attach'{name = {utf8, Name}, input_handle = InHandle, delivery_count = DeliveryCount, max_message_size = MaxMessageSize}, - State = State0#state{links = Links#{OutHandle => Link}, - link_index = maps:remove(Name, LinkIndex), + State = State0#state{links = Links#{OutHandle := Link}, + link_index = maps:remove(LinkIndexKey, LinkIndex), link_handle_index = LHI#{InHandle => OutHandle}}, {keep_state, State}; mapped(cast, #'v1_0.detach'{handle = {uint, InHandle}, @@ -648,8 +651,8 @@ build_frames(Channel, Trf, Payload, MaxPayloadSize, Acc) -> make_source(#{role := {sender, _}}) -> #'v1_0.source'{}; -make_source(#{role := {receiver, #{address := Address} = Target, _Pid}, filter := Filter}) -> - Durable = translate_terminus_durability(maps:get(durable, Target, none)), +make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter := Filter}) -> + Durable = translate_terminus_durability(maps:get(durable, Source, none)), TranslatedFilter = translate_filters(Filter), #'v1_0.source'{address = {utf8, Address}, durable = {uint, Durable}, @@ -743,35 +746,34 @@ detach_with_error_cond(Link = #link{output_handle = OutHandle}, State, Cond) -> ok = send(Detach, State), Link#link{state = detach_sent}. -send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _}, - #state{next_link_handle = OutHandle0, links = Links, +send_attach(Send, #{name := Name, role := RoleTuple} = Args, {FromPid, _}, + #state{next_link_handle = OutHandle0, links = Links, link_index = LinkIndex} = State) -> Source = make_source(Args), Target = make_target(Args), Properties = amqp10_client_types:make_properties(Args), - {LinkTarget, RoleAsBool, InitialDeliveryCount, MaxMessageSize} = - case Role of + {LinkTarget, InitialDeliveryCount, MaxMessageSize} = + case RoleTuple of {receiver, _, Pid} -> - {{pid, Pid}, true, undefined, max_message_size(Args)}; + {{pid, Pid}, undefined, max_message_size(Args)}; {sender, #{address := TargetAddr}} -> - {TargetAddr, false, uint(?INITIAL_DELIVERY_COUNT), undefined} - end, - - {OutHandle, NextLinkHandle} = - case Args of - #{handle := Handle} -> - %% Client app provided link handle. - %% Really only meant for integration tests. - {Handle, OutHandle0}; - _ -> - {OutHandle0, OutHandle0 + 1} + {TargetAddr, uint(?INITIAL_DELIVERY_COUNT), undefined} end, + {OutHandle, NextLinkHandle} = case Args of + #{handle := Handle} -> + %% Client app provided link handle. + %% Really only meant for integration tests. + {Handle, OutHandle0}; + _ -> + {OutHandle0, OutHandle0 + 1} + end, + Role = element(1, RoleTuple), % create attach performative Attach = #'v1_0.attach'{name = {utf8, Name}, - role = RoleAsBool, + role = role_to_boolean(Role), handle = {uint, OutHandle}, source = Source, properties = Properties, @@ -782,12 +784,12 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _}, max_message_size = MaxMessageSize}, ok = Send(Attach, State), - LinkRef = make_link_ref(element(1, Role), self(), OutHandle), + Ref = make_link_ref(Role, self(), OutHandle), Link = #link{name = Name, - ref = LinkRef, + ref = Ref, output_handle = OutHandle, state = attach_sent, - role = element(1, Role), + role = Role, notify = FromPid, auto_flow = never, target = LinkTarget, @@ -796,7 +798,7 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _}, {State#state{links = Links#{OutHandle => Link}, next_link_handle = NextLinkHandle, - link_index = LinkIndex#{Name => OutHandle}}, LinkRef}. + link_index = LinkIndex#{{Role, Name} => OutHandle}}, Ref}. -spec handle_session_flow(#'v1_0.flow'{}, #state{}) -> #state{}. handle_session_flow(#'v1_0.flow'{next_incoming_id = MaybeNII, @@ -1090,6 +1092,16 @@ sym(B) when is_atom(B) -> {symbol, atom_to_binary(B, utf8)}. reason(undefined) -> normal; reason(Other) -> Other. +role_to_boolean(sender) -> + ?AMQP_ROLE_SENDER; +role_to_boolean(receiver) -> + ?AMQP_ROLE_RECEIVER. + +boolean_to_role(?AMQP_ROLE_SENDER) -> + sender; +boolean_to_role(?AMQP_ROLE_RECEIVER) -> + receiver. + format_status(Status = #{data := Data0}) -> #state{channel = Channel, remote_channel = RemoteChannel, diff --git a/deps/amqp10_client/src/amqp10_msg.erl b/deps/amqp10_client/src/amqp10_msg.erl index f356782f8ba7..9302b2cce6de 100644 --- a/deps/amqp10_client/src/amqp10_msg.erl +++ b/deps/amqp10_client/src/amqp10_msg.erl @@ -6,6 +6,8 @@ %% -module(amqp10_msg). +-include_lib("amqp10_common/include/amqp10_types.hrl"). + -export([from_amqp_records/1, to_amqp_records/1, % "read" api @@ -256,12 +258,12 @@ body_bin(#amqp10_msg{body = #'v1_0.amqp_value'{} = Body}) -> new(DeliveryTag, Body, Settled) when is_binary(Body) -> #amqp10_msg{transfer = #'v1_0.transfer'{delivery_tag = {binary, DeliveryTag}, settled = Settled, - message_format = {uint, 0}}, + message_format = {uint, ?MESSAGE_FORMAT}}, body = [#'v1_0.data'{content = Body}]}; new(DeliveryTag, Body, Settled) -> % TODO: constrain to amqp types #amqp10_msg{transfer = #'v1_0.transfer'{delivery_tag = {binary, DeliveryTag}, settled = Settled, - message_format = {uint, 0}}, + message_format = {uint, ?MESSAGE_FORMAT}}, body = Body}. %% @doc Create a new settled amqp10 message using the specified delivery tag @@ -322,8 +324,13 @@ set_properties(Props, #amqp10_msg{properties = undefined} = Msg) -> set_properties(Props, Msg#amqp10_msg{properties = #'v1_0.properties'{}}); set_properties(Props, #amqp10_msg{properties = Current} = Msg) -> % TODO many fields are `any` types and we need to try to type tag them - P = maps:fold(fun(message_id, V, Acc) when is_binary(V) -> - % message_id can be any type but we restrict it here + P = maps:fold(fun(message_id, {T, _V} = TypeVal, Acc) when T =:= ulong orelse + T =:= uuid orelse + T =:= binary orelse + T =:= uf8 -> + Acc#'v1_0.properties'{message_id = TypeVal}; + (message_id, V, Acc) when is_binary(V) -> + %% backward compat clause Acc#'v1_0.properties'{message_id = utf8(V)}; (user_id, V, Acc) when is_binary(V) -> Acc#'v1_0.properties'{user_id = {binary, V}}; diff --git a/deps/amqp10_common/include/amqp10_types.hrl b/deps/amqp10_common/include/amqp10_types.hrl index 550c2bc773f3..3068f6efb4f5 100644 --- a/deps/amqp10_common/include/amqp10_types.hrl +++ b/deps/amqp10_common/include/amqp10_types.hrl @@ -10,3 +10,10 @@ -type transfer_number() :: sequence_no(). % [2.8.10] -type sequence_no() :: uint(). + +% [2.8.1] +-define(AMQP_ROLE_SENDER, false). +-define(AMQP_ROLE_RECEIVER, true). + +% [3.2.16] +-define(MESSAGE_FORMAT, 0). diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index 440567fc64f1..a221e03303bc 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -47,6 +47,7 @@ def all_beam_files(name = "all_beam_files"): "src/rabbit_access_control.erl", "src/rabbit_alarm.erl", "src/rabbit_amqp1_0.erl", + "src/rabbit_amqp_management.erl", "src/rabbit_amqp_reader.erl", "src/rabbit_amqp_session.erl", "src/rabbit_amqp_session_sup.erl", @@ -316,6 +317,7 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/rabbit_access_control.erl", "src/rabbit_alarm.erl", "src/rabbit_amqp1_0.erl", + "src/rabbit_amqp_management.erl", "src/rabbit_amqp_reader.erl", "src/rabbit_amqp_session.erl", "src/rabbit_amqp_session_sup.erl", @@ -600,6 +602,7 @@ def all_srcs(name = "all_srcs"): "src/rabbit_access_control.erl", "src/rabbit_alarm.erl", "src/rabbit_amqp1_0.erl", + "src/rabbit_amqp_management.erl", "src/rabbit_amqp_reader.erl", "src/rabbit_amqp_session.erl", "src/rabbit_amqp_session_sup.erl", diff --git a/deps/rabbit/include/rabbit_amqp.hrl b/deps/rabbit/include/rabbit_amqp.hrl index 282cad9d4e47..efd7dfc663eb 100644 --- a/deps/rabbit/include/rabbit_amqp.hrl +++ b/deps/rabbit/include/rabbit_amqp.hrl @@ -25,9 +25,6 @@ %% [2.8.19] -define(MIN_MAX_FRAME_1_0_SIZE, 512). --define(SEND_ROLE, false). --define(RECV_ROLE, true). - %% for rabbit_event user_authentication_success and user_authentication_failure -define(AUTH_EVENT_KEYS, [name, diff --git a/deps/rabbit/src/rabbit_amqp_management.erl b/deps/rabbit/src/rabbit_amqp_management.erl new file mode 100644 index 000000000000..49bbb80526f2 --- /dev/null +++ b/deps/rabbit/src/rabbit_amqp_management.erl @@ -0,0 +1,163 @@ +-module(rabbit_amqp_management). + +-include("rabbit_amqp.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). + +-export([process_request/4]). + +%% An HTTP message mapped to AMQP using projected mode +%% [HTTP over AMQP Working Draft 06 §4.1] +-record(msg, + { + properties :: #'v1_0.properties'{}, + application_properties :: list(), + data = [] :: [#'v1_0.data'{}] + }). + +-spec process_request(binary(), rabbit_types:vhost(), rabbit_types:user(), pid()) -> iolist(). +process_request(Request, Vhost, User, ConnectionPid) -> + ReqSections = amqp10_framing:decode_bin(Request), + ?DEBUG("~s Inbound request:~n ~tp", + [?MODULE, [amqp10_framing:pprint(Section) || Section <- ReqSections]]), + #msg{properties = #'v1_0.properties'{ + message_id = MessageId, + to = {utf8, HttpRequestTarget}, + subject = {utf8, HttpMethod}, + %% see Link Pair CS 01 §2.1 + %% https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html#_Toc51331305 + reply_to = {utf8, <<"$me">>}, + content_type = MaybeContentType}, + application_properties = _OtherHttpHeaders, + data = ReqBody + } = decode_req(ReqSections, {undefined, undefined, []}), + ReqPayload = amqp10_framing:decode_bin(list_to_binary(ReqBody)), + {RespProps0, + RespAppProps0 = #'v1_0.application_properties'{content = C}, + RespPayload} = process_http_request(HttpMethod, + HttpRequestTarget, + MaybeContentType, + ReqPayload, + Vhost, + User, + ConnectionPid), + RespProps = RespProps0#'v1_0.properties'{ + %% "To associate a response with a request, the correlation-id value of the response + %% properties MUST be set to the message-id value of the request properties." + %% [HTTP over AMQP WD 06 §5.1] + correlation_id = MessageId}, + RespAppProps = RespAppProps0#'v1_0.application_properties'{ + content = [{{utf8, <<"http:response">>}, {utf8, <<"1.1">>}} | C]}, + RespDataSect = #'v1_0.data'{content = iolist_to_binary(amqp10_framing:encode_bin(RespPayload))}, + RespSections = [RespProps, RespAppProps, RespDataSect], + [amqp10_framing:encode_bin(Sect) || Sect <- RespSections]. + +process_http_request(<<"POST">>, + <<"/$management/entities">>, + {symbol, <<"application/amqp-management+amqp", _OptionalType/binary>>}, + [ReqPayload], + Vhost, + #user{username = Username}, + _) -> + #{name := QNameBin, + durable := Durable, + auto_delete := AutoDelete, + owner := Owner, + arguments := QArgs} = decode_queue(ReqPayload), + QType = rabbit_amqqueue:get_queue_type(QArgs), + QName = rabbit_misc:r(Vhost, queue, QNameBin), + Q0 = amqqueue:new(QName, none, Durable, AutoDelete, Owner, + QArgs, Vhost, #{user => Username}, QType), + {new, _Q} = rabbit_queue_type:declare(Q0, node()), + Props = #'v1_0.properties'{ + subject = {utf8, <<"201">>}, + content_type = {symbol, <<"application/amqp-management+amqp;type=entity-collection">>} + }, + Self = {utf8, <<"/$management/queues/", QNameBin/binary>>}, + AppProps = #'v1_0.application_properties'{ + %% TODO include vhost in URI? + content = [{{utf8, <<"location">>}, Self}]}, + RespPayload = {map, [{{utf8, <<"type">>}, {utf8, <<"queue">>}}, + {{utf8, <<"id">>}, {utf8, QNameBin}}, + {{utf8, <<"self">>}, Self}, + {{utf8, <<"target">>}, {utf8, <<"/queue/", QNameBin/binary>>}}, + {{utf8, <<"management">>}, {utf8, <<"$management">>}} + ]}, + {Props, AppProps, RespPayload}; + +process_http_request(<<"POST">>, + <<"/$management/queues/", QNamePath/binary>>, + undefined, + [], + Vhost, + _User, + ConnectionPid) -> + [QNameBin, <<>>] = re:split(QNamePath, <<"/\\$management/purge$">>, [{return, binary}]), + QName = rabbit_misc:r(Vhost, queue, QNameBin), + + {ok, NumMsgs} = rabbit_amqqueue:with_exclusive_access_or_die( + QName, ConnectionPid, + fun (Q) -> + rabbit_queue_type:purge(Q) + end), + Props = #'v1_0.properties'{ + subject = {utf8, <<"200">>}, + content_type = {symbol, <<"application/amqp-management+amqp">>} + }, + AppProps = #'v1_0.application_properties'{content = []}, + RespPayload = {map, [{{utf8, <<"message_count">>}, {ulong, NumMsgs}}]}, + {Props, AppProps, RespPayload}; + +process_http_request(<<"DELETE">>, + <<"/$management/queues/", QNameBin/binary>>, + undefined, + [], + Vhost, + #user{username = Username}, + ConnectionPid) -> + QName = rabbit_misc:r(Vhost, queue, QNameBin), + {ok, NumMsgs} = rabbit_amqqueue:delete_with(QName, ConnectionPid, false, false, Username, true), + Props = #'v1_0.properties'{ + subject = {utf8, <<"200">>}, + content_type = {symbol, <<"application/amqp-management+amqp">>} + }, + AppProps = #'v1_0.application_properties'{content = []}, + RespPayload = {map, [{{utf8, <<"message_count">>}, {ulong, NumMsgs}}]}, + {Props, AppProps, RespPayload}. + +decode_queue({map, KVList}) -> + lists:foldl( + fun({{utf8, <<"type">>}, {utf8, <<"queue">>}}, Acc) -> + Acc; + ({{utf8, <<"name">>}, {utf8, V}}, Acc) -> + Acc#{name => V}; + ({{utf8, <<"durable">>}, V}, Acc) + when is_boolean(V) -> + Acc#{durable => V}; + ({{utf8, <<"exclusive">>}, V}, Acc) -> + Owner = case V of + false -> none; + true -> self() + end, + Acc#{owner => Owner}; + ({{utf8, <<"auto_delete">>}, V}, Acc) + when is_boolean(V) -> + Acc#{auto_delete => V}; + ({{utf8, <<"arguments">>}, {map, List}}, Acc) -> + Args = [{Key, longstr, V} + || {{utf8, Key = <<"x-", _/binary>>}, + {utf8, V}} <- List], + Acc#{arguments => Args} + end, #{}, KVList). + +decode_req([], {Props, AppProps, DataRev}) -> + #msg{properties = Props, + application_properties = AppProps, + data = lists:reverse(DataRev)}; +decode_req([#'v1_0.properties'{} = P | Rem], Acc) -> + decode_req(Rem, setelement(1, Acc, P)); +decode_req([#'v1_0.application_properties'{content = Content} | Rem], Acc) -> + decode_req(Rem, setelement(2, Content, Acc)); +decode_req([#'v1_0.data'{content = C} | Rem], {Props, AppProps, DataRev}) -> + decode_req(Rem, {Props, AppProps, [C | DataRev]}); +decode_req([_IgnoreOtherSection | Rem], Acc) -> + decode_req(Rem, Acc). diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index 4b161bb0689c..1c1ae5b163e5 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -125,9 +125,9 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) -> {ok, Misc}. server_properties() -> - %% The atom doesn't match anything, it's just "not 0-9-1". - Raw = lists:keydelete(<<"capabilities">>, 1, rabbit_reader:server_properties(amqp_1_0)), - {map, [{{symbol, K}, {utf8, V}} || {K, longstr, V} <- Raw]}. + Props0 = rabbit_reader:server_properties(amqp_1_0), + Props = [{{symbol, K}, {utf8, V}} || {K, longstr, V} <- Props0], + {map, Props}. %%-------------------------------------------------------------------------- @@ -489,6 +489,8 @@ handle_1_0_connection_frame( %% "the value in idle-time-out SHOULD be half the peer's actual timeout threshold" [2.4.5] idle_time_out = {uint, ReceiveTimeoutMillis div 2}, container_id = {utf8, rabbit_nodes:cluster_name()}, + offered_capabilities = {array, symbol, [{symbol, <<"AMQP_MANAGEMENT_V1_0">>}, + {symbol, <<"LINK_PAIR_V1_0">>}]}, properties = server_properties()}), State; handle_1_0_connection_frame(#'v1_0.close'{}, State0) -> diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 6b937c4c2fee..6d7aaa8d6a19 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -73,6 +73,24 @@ settled :: boolean() }). +%% For AMQP management operations, we require a link pair as described in +%% https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html +-record(management_link_pair, { + client_terminus_address :: tuple(), + incoming_half :: unattached | link_handle(), + outgoing_half :: unattached | link_handle() + }). + +%% Incoming or outgoing half of the link pair. +-record(management_link, { + name :: binary(), + delivery_count :: sequence_no(), + %% Credit on an incoming management link is always 1 since management + %% requests are synchronous and we grant 1 credit when sending a response. + credit :: non_neg_integer(), + max_message_size :: unlimited | pos_integer() + }). + -record(incoming_link, { exchange :: rabbit_exchange:name(), routing_key :: undefined | rabbit_types:routing_key(), @@ -105,8 +123,7 @@ %% The queue sent us this consumer scoped sequence number. msg_id :: rabbit_amqqueue:msg_id(), consumer_tag :: rabbit_types:ctag(), - queue_name :: rabbit_amqqueue:name(), - delivered_at :: integer() + queue_name :: rabbit_amqqueue:name() }). -record(pending_transfer, { @@ -118,6 +135,10 @@ outgoing_unsettled :: #outgoing_unsettled{} }). +-record(pending_management_transfer, { + frames :: iolist() + }). + -record(cfg, { outgoing_max_frame_size :: unlimited | pos_integer(), reader_pid :: rabbit_types:connection(), @@ -190,7 +211,9 @@ %% Even when we are limited by session flow control, we must make sure to first send the TRANSFER to the %% client (once the remote_incoming_window got opened) followed by the FLOW with drain=true and credit=0 %% and advanced delivery count. Otherwise, we would violate the AMQP protocol spec. - outgoing_pending = queue:new() :: queue:queue(#pending_transfer{} | #'v1_0.flow'{}), + outgoing_pending = queue:new() :: queue:queue(#pending_transfer{} | + #pending_management_transfer{} | + #'v1_0.flow'{}), %% The link or session endpoint assigns each message a unique delivery-id %% from a session scoped sequence number. @@ -212,6 +235,10 @@ %% We send messages to clients on outgoing links. outgoing_links = #{} :: #{link_handle() => #outgoing_link{}}, + management_link_pairs = #{} :: #{LinkName :: binary() => #management_link_pair{}}, + incoming_management_links = #{} :: #{link_handle() => #management_link{}}, + outgoing_management_links = #{} :: #{link_handle() => #management_link{}}, + %% TRANSFER delivery IDs published to consuming clients but not yet acknowledged by clients. outgoing_unsettled_map = #{} :: #{delivery_number() => #outgoing_unsettled{}}, @@ -695,15 +722,130 @@ disposition(DeliveryState, First, Last) -> ?UINT(Last) end, #'v1_0.disposition'{ - role = ?RECV_ROLE, + role = ?AMQP_ROLE_RECEIVER, settled = true, state = DeliveryState, first = ?UINT(First), last = Last1}. -handle_control(#'v1_0.attach'{role = ?SEND_ROLE, +handle_control(#'v1_0.attach'{ + role = ?AMQP_ROLE_SENDER, + snd_settle_mode = SndSettleMode, + name = Name = {utf8, LinkName}, + handle = Handle = ?UINT(HandleInt), + source = Source = #'v1_0.source'{address = ClientTerminusAddress}, + target = Target = #'v1_0.target'{address = {utf8, <<"$management">>}}, + initial_delivery_count = DeliveryCount = ?UINT(DeliveryCountInt), + properties = Properties + } = Attach, + #state{management_link_pairs = Pairs0, + incoming_management_links = Links + } = State0) -> + ok = validate_attach(Attach), + ok = check_paired(Properties), + Pairs = case Pairs0 of + #{LinkName := #management_link_pair{ + client_terminus_address = ClientTerminusAddress, + incoming_half = unattached, + outgoing_half = H} = Pair} + when is_integer(H) -> + maps:update(LinkName, + Pair#management_link_pair{incoming_half = HandleInt}, + Pairs0); + #{LinkName := Other} -> + protocol_error(?V_1_0_AMQP_ERROR_PRECONDITION_FAILED, + "received invalid attach ~p for management link pair ~p", + [Attach, Other]); + _ -> + maps:put(LinkName, + #management_link_pair{client_terminus_address = ClientTerminusAddress, + incoming_half = HandleInt, + outgoing_half = unattached}, + Pairs0) + end, + Credit = 1, + MaxMessageSize = persistent_term:get(max_message_size), + Link = #management_link{name = LinkName, + delivery_count = DeliveryCountInt, + credit = Credit, + max_message_size = MaxMessageSize}, + State = State0#state{management_link_pairs = Pairs, + incoming_management_links = maps:put(HandleInt, Link, Links)}, + Reply = #'v1_0.attach'{ + name = Name, + handle = Handle, + %% We are the receiver. + role = ?AMQP_ROLE_RECEIVER, + snd_settle_mode = SndSettleMode, + rcv_settle_mode = ?V_1_0_RECEIVER_SETTLE_MODE_FIRST, + source = Source, + target = Target, + max_message_size = {ulong, MaxMessageSize}, + properties = Properties}, + Flow = #'v1_0.flow'{handle = Handle, + delivery_count = DeliveryCount, + link_credit = ?UINT(Credit)}, + reply0([Reply, Flow], State); + +handle_control(#'v1_0.attach'{ + role = ?AMQP_ROLE_RECEIVER, + name = Name = {utf8, LinkName}, + handle = Handle = ?UINT(HandleInt), + source = Source = #'v1_0.source'{address = {utf8, <<"$management">>}}, + target = Target = #'v1_0.target'{address = ClientTerminusAddress}, + rcv_settle_mode = RcvSettleMode, + max_message_size = MaybeMaxMessageSize, + properties = Properties + } = Attach, + #state{management_link_pairs = Pairs0, + outgoing_management_links = Links + } = State0) -> + ok = validate_attach(Attach), + ok = check_paired(Properties), + Pairs = case Pairs0 of + #{LinkName := #management_link_pair{ + client_terminus_address = ClientTerminusAddress, + incoming_half = H, + outgoing_half = unattached} = Pair} + when is_integer(H) -> + maps:update(LinkName, + Pair#management_link_pair{outgoing_half = HandleInt}, + Pairs0); + #{LinkName := Other} -> + protocol_error(?V_1_0_AMQP_ERROR_PRECONDITION_FAILED, + "received invalid attach ~p for management link pair ~p", + [Attach, Other]); + _ -> + maps:put(LinkName, + #management_link_pair{client_terminus_address = ClientTerminusAddress, + incoming_half = unattached, + outgoing_half = HandleInt}, + Pairs0) + end, + MaxMessageSize = max_message_size(MaybeMaxMessageSize), + Link = #management_link{name = LinkName, + delivery_count = ?INITIAL_DELIVERY_COUNT, + credit = 0, + max_message_size = MaxMessageSize}, + State = State0#state{management_link_pairs = Pairs, + outgoing_management_links = maps:put(HandleInt, Link, Links)}, + Reply = #'v1_0.attach'{ + name = Name, + handle = Handle, + role = ?AMQP_ROLE_SENDER, + snd_settle_mode = ?V_1_0_SENDER_SETTLE_MODE_SETTLED, + rcv_settle_mode = RcvSettleMode, + source = Source, + target = Target, + initial_delivery_count = ?UINT(?INITIAL_DELIVERY_COUNT), + %% Echo back that we will respect the client's requested max-message-size. + max_message_size = MaybeMaxMessageSize, + properties = Properties}, + reply0(Reply, State); + +handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER, name = LinkName, - handle = InputHandle = ?UINT(HandleInt), + handle = Handle = ?UINT(HandleInt), source = Source, snd_settle_mode = SndSettleMode, target = Target, @@ -722,21 +864,20 @@ handle_control(#'v1_0.attach'{role = ?SEND_ROLE, delivery_count = DeliveryCountInt, credit = ?LINK_CREDIT_RCV}, _Outcomes = outcomes(Source), - OutputHandle = output_handle(InputHandle), Reply = #'v1_0.attach'{ name = LinkName, - handle = OutputHandle, + handle = Handle, source = Source, snd_settle_mode = SndSettleMode, rcv_settle_mode = ?V_1_0_RECEIVER_SETTLE_MODE_FIRST, target = Target, %% We are the receiver. - role = ?RECV_ROLE, + role = ?AMQP_ROLE_RECEIVER, max_message_size = {ulong, persistent_term:get(max_message_size)}}, - Flow = #'v1_0.flow'{handle = OutputHandle, + Flow = #'v1_0.flow'{handle = Handle, delivery_count = DeliveryCount, link_credit = ?UINT(?LINK_CREDIT_RCV)}, - %%TODO check that handle is not present in either incoming_links or outgoing_links: + %%TODO check that handle is not in use for any other open links. %%"The handle MUST NOT be used for other open links. An attempt to attach %% using a handle which is already associated with a link MUST be responded to %% with an immediate close carrying a handle-in-use session-error." @@ -751,9 +892,9 @@ handle_control(#'v1_0.attach'{role = ?SEND_ROLE, [Reason]) end; -handle_control(#'v1_0.attach'{role = ?RECV_ROLE, +handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, name = LinkName, - handle = InputHandle = ?UINT(HandleInt), + handle = Handle = ?UINT(HandleInt), source = Source, snd_settle_mode = SndSettleMode, rcv_settle_mode = RcvSettleMode, @@ -821,10 +962,9 @@ handle_control(#'v1_0.attach'{role = ?RECV_ROLE, acting_user => Username}, case rabbit_queue_type:consume(Q, Spec, QStates0) of {ok, QStates} -> - OutputHandle = output_handle(InputHandle), A = #'v1_0.attach'{ name = LinkName, - handle = OutputHandle, + handle = Handle, initial_delivery_count = ?UINT(?INITIAL_DELIVERY_COUNT), snd_settle_mode = EffectiveSndSettleMode, rcv_settle_mode = RcvSettleMode, @@ -834,26 +974,15 @@ handle_control(#'v1_0.attach'{role = ?RECV_ROLE, source = Source#'v1_0.source'{ default_outcome = #'v1_0.released'{}, outcomes = outcomes(Source)}, - role = ?SEND_ROLE, + role = ?AMQP_ROLE_SENDER, %% Echo back that we will respect the client's requested max-message-size. max_message_size = MaybeMaxMessageSize}, - MaxMessageSize = case MaybeMaxMessageSize of - {ulong, Size} when Size > 0 -> - Size; - _ -> - %% "If this field is zero or unset, there is no - %% maximum size imposed by the link endpoint." - unlimited - end, + MaxMessageSize = max_message_size(MaybeMaxMessageSize), Link = #outgoing_link{queue_name_bin = QNameBin, queue_type = QType, send_settled = SndSettled, max_message_size = MaxMessageSize, delivery_count = DeliveryCount}, - %%TODO check that handle is not present in either incoming_links or outgoing_links: - %%"The handle MUST NOT be used for other open links. An attempt to attach - %% using a handle which is already associated with a link MUST be responded to - %% with an immediate close carrying a handle-in-use session-error." OutgoingLinks = OutgoingLinks0#{HandleInt => Link}, State1 = State0#state{queue_states = QStates, outgoing_links = OutgoingLinks}, @@ -882,63 +1011,68 @@ handle_control(#'v1_0.attach'{role = ?RECV_ROLE, handle_control({Txfr = #'v1_0.transfer'{handle = ?UINT(Handle)}, MsgPart}, State0 = #state{incoming_links = IncomingLinks}) -> + {Flows, State1} = session_flow_control_received_transfer(State0), + + {Reply, State} = case IncomingLinks of #{Handle := Link0} -> - {Flows, State1} = session_flow_control_received_transfer(State0), - case incoming_link_transfer(Txfr, MsgPart, Link0, State1) of + case incoming_link_transfer(Txfr, MsgPart, Link0, State1) of {ok, Reply0, Link, State2} -> - Reply = Reply0 ++ Flows, - State = State2#state{incoming_links = maps:update(Handle, Link, IncomingLinks)}, - reply0(Reply, State); + {Reply0, State2#state{incoming_links = IncomingLinks#{Handle := Link}}}; {error, Reply0} -> %% "When an error occurs at a link endpoint, the endpoint MUST be detached %% with appropriate error information supplied in the error field of the %% detach frame. The link endpoint MUST then be destroyed." [2.6.5] - Reply = Reply0 ++ Flows, - State = State1#state{incoming_links = maps:remove(Handle, IncomingLinks)}, - reply0(Reply, State) + {Reply0, State1#state{incoming_links = maps:remove(Handle, IncomingLinks)}} end; _ -> - protocol_error(?V_1_0_AMQP_ERROR_ILLEGAL_STATE, - "Unknown link handle: ~p", [Handle]) - end; + incoming_mgmt_link_transfer(Txfr, MsgPart, State1) + end, + reply0(Reply ++ Flows, State); + %% Flow control. These frames come with two pieces of information: %% the session window, and optionally, credit for a particular link. %% We'll deal with each of them separately. handle_control(#'v1_0.flow'{handle = Handle} = Flow, #state{incoming_links = IncomingLinks, - outgoing_links = OutgoingLinks} = State0) -> + outgoing_links = OutgoingLinks, + incoming_management_links = IncomingMgmtLinks, + outgoing_management_links = OutgoingMgmtLinks + } = State0) -> State = session_flow_control_received_flow(Flow, State0), - case Handle of - undefined -> - %% "If not set, the flow frame is carrying only information - %% pertaining to the session endpoint." [2.7.4] - {noreply, State}; - ?UINT(HandleInt) -> - %% "If set, indicates that the flow frame carries flow state information - %% for the local link endpoint associated with the given handle." [2.7.4] - case OutgoingLinks of - #{HandleInt := OutgoingLink} -> - {noreply, handle_outgoing_link_flow_control(OutgoingLink, Flow, State)}; - _ -> - case IncomingLinks of - #{HandleInt := _IncomingLink} -> - %% We're being told about available messages at - %% the sender. Yawn. TODO at least check transfer-count? - {noreply, State}; - _ -> - %% "If set to a handle that is not currently associated with - %% an attached link, the recipient MUST respond by ending the - %% session with an unattached-handle session error." [2.7.4] - rabbit_log:warning( - "Received Flow frame for unknown link handle: ~tp", [Flow]), - protocol_error( - ?V_1_0_SESSION_ERROR_UNATTACHED_HANDLE, - "Unattached link handle: ~b", [HandleInt]) - end - end - end; + S = case Handle of + undefined -> + %% "If not set, the flow frame is carrying only information + %% pertaining to the session endpoint." [2.7.4] + State; + ?UINT(HandleInt) -> + %% "If set, indicates that the flow frame carries flow state information + %% for the local link endpoint associated with the given handle." [2.7.4] + case OutgoingLinks of + #{HandleInt := OutgoingLink} -> + handle_outgoing_link_flow_control(OutgoingLink, Flow, State); + _ -> + case OutgoingMgmtLinks of + #{HandleInt := OutgoingMgmtLink} -> + handle_outgoing_mgmt_link_flow_control(OutgoingMgmtLink, Flow, State); + _ when is_map_key(HandleInt, IncomingLinks) orelse + is_map_key(HandleInt, IncomingMgmtLinks) -> + %% We're being told about available messages at the sender. + State; + _ -> + %% "If set to a handle that is not currently associated with + %% an attached link, the recipient MUST respond by ending the + %% session with an unattached-handle session error." [2.7.4] + rabbit_log:warning( + "Received Flow frame for unknown link handle: ~tp", [Flow]), + protocol_error( + ?V_1_0_SESSION_ERROR_UNATTACHED_HANDLE, + "Unattached link handle: ~b", [HandleInt]) + end + end + end, + {noreply, S}; handle_control(#'v1_0.detach'{handle = Handle = ?UINT(HandleInt), closed = Closed}, @@ -1026,7 +1160,7 @@ handle_control(#'v1_0.end'{}, end, {stop, normal, State}; -handle_control(#'v1_0.disposition'{role = ?RECV_ROLE, +handle_control(#'v1_0.disposition'{role = ?AMQP_ROLE_RECEIVER, first = ?UINT(First), last = Last0, state = Outcome, @@ -1097,7 +1231,7 @@ handle_control(#'v1_0.disposition'{role = ?RECV_ROLE, Reply = case DispositionSettled of true -> []; false -> [Disposition#'v1_0.disposition'{settled = true, - role = ?SEND_ROLE}] + role = ?AMQP_ROLE_SENDER}] end, State = handle_queue_actions(Actions, State1), reply0(Reply, State) @@ -1144,11 +1278,11 @@ send_pending(#state{remote_incoming_window = Space, end, {NumTransfersSent, Buf, State1} = case send_frames(SendFun, Frames, Space) of - {all, SpaceLeft} -> + {sent_all, SpaceLeft} -> {Space - SpaceLeft, Buf1, record_outgoing_unsettled(Pending, State0)}; - {some, Rest} -> + {sent_some, Rest} -> {Space, queue:in_r(Pending#pending_transfer{frames = Rest}, Buf1), State0} @@ -1156,18 +1290,32 @@ send_pending(#state{remote_incoming_window = Space, State2 = session_flow_control_sent_transfers(NumTransfersSent, State1), State = State2#state{outgoing_pending = Buf}, send_pending(State); - {{value, #pending_transfer{}}, _} - when Space =:= 0 -> + {{value, Pending = #pending_management_transfer{frames = Frames}}, Buf1} + when Space > 0 -> + SendFun = fun(Transfer, Sections) -> + rabbit_amqp_writer:send_command(WriterPid, Ch, Transfer, Sections) + end, + {NumTransfersSent, Buf} = + case send_frames(SendFun, Frames, Space) of + {sent_all, SpaceLeft} -> + {Space - SpaceLeft, Buf1}; + {sent_some, Rest} -> + {Space, queue:in_r(Pending#pending_management_transfer{frames = Rest}, Buf1)} + end, + State1 = session_flow_control_sent_transfers(NumTransfersSent, State0), + State = State1#state{outgoing_pending = Buf}, + send_pending(State); + _ when Space =:= 0 -> State0 end. -send_frames(_, [], Left) -> - {all, Left}; +send_frames(_, [], SpaceLeft) -> + {sent_all, SpaceLeft}; send_frames(_, Rest, 0) -> - {some, Rest}; -send_frames(SendFun, [[Transfer, Sections] | Rest], Left) -> + {sent_some, Rest}; +send_frames(SendFun, [[Transfer, Sections] | Rest], SpaceLeft) -> ok = SendFun(Transfer, Sections), - send_frames(SendFun, Rest, Left - 1). + send_frames(SendFun, Rest, SpaceLeft - 1). record_outgoing_unsettled(#pending_transfer{queue_ack_required = true, delivery_id = DeliveryId, @@ -1269,10 +1417,15 @@ settle_op_from_outcome(Outcome) -> "Unrecognised state: ~tp in DISPOSITION", [Outcome]). +-spec flow({uint, link_handle()}, sequence_no()) -> #'v1_0.flow'{}. flow(Handle, DeliveryCount) -> + flow(Handle, DeliveryCount, ?LINK_CREDIT_RCV). + +-spec flow({uint, link_handle()}, sequence_no(), non_neg_integer()) -> #'v1_0.flow'{}. +flow(Handle, DeliveryCount, LinkCredit) -> #'v1_0.flow'{handle = Handle, delivery_count = ?UINT(DeliveryCount), - link_credit = ?UINT(?LINK_CREDIT_RCV)}. + link_credit = ?UINT(LinkCredit)}. session_flow_fields(Frames, State) when is_list(Frames) -> @@ -1423,7 +1576,7 @@ handle_deliver(ConsumerTag, AckRequired, handle = ?UINT(Handle), delivery_id = ?UINT(DeliveryId), delivery_tag = {binary, Dtag}, - message_format = ?UINT(0), % [3.2.16] + message_format = ?UINT(?MESSAGE_FORMAT), settled = SendSettled}, Mc1 = mc:convert(mc_amqp, Mc0), Mc = mc:set_annotation(redelivered, Redelivered, Mc1), @@ -1433,14 +1586,7 @@ handle_deliver(ConsumerTag, AckRequired, [?MODULE, [amqp10_framing:pprint(Section) || Section <- amqp10_framing:decode_bin(iolist_to_binary(Sections))]]), validate_message_size(Sections, MaxMessageSize), - Frames = case MaxFrameSize of - unlimited -> - [[Transfer, Sections]]; - _ -> - %% TODO Ugh - TLen = iolist_size(amqp10_framing:encode_bin(Transfer)), - encode_frames(Transfer, Sections, MaxFrameSize - TLen, []) - end, + Frames = transfer_frames(Transfer, Sections, MaxFrameSize), messages_delivered(Redelivered, QType), rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, Trace), OutgoingLinks = case DelCount of @@ -1453,11 +1599,7 @@ handle_deliver(ConsumerTag, AckRequired, Del = #outgoing_unsettled{ msg_id = MsgId, consumer_tag = ConsumerTag, - queue_name = QName, - %% The consumer timeout interval starts already from the point in time the - %% queue sent us the message so that the Ra log can be truncated even if - %% the message is sitting here for a long time. - delivered_at = os:system_time(millisecond)}, + queue_name = QName}, PendingTransfer = #pending_transfer{ frames = Frames, queue_ack_required = AckRequired, @@ -1505,6 +1647,92 @@ delivery_tag(MsgId = {Priority, Seq}, _) %%% Incoming Link %%% %%%%%%%%%%%%%%%%%%%%% +incoming_mgmt_link_transfer( + #'v1_0.transfer'{ + %%TODO confirm or reject incoming message if incoming settled=false + delivery_id = _MaybeIncomingDeliveryId, + settled = MaybeSettled, + %% In the current implementation, we disallow large incoming management request messages. + more = false, + rcv_settle_mode = RcvSettleMode, + handle = IncomingHandle = ?UINT(IncomingHandleInt)}, + Request, + #state{management_link_pairs = LinkPairs, + incoming_management_links = IncomingLinks, + outgoing_management_links = OutgoingLinks, + outgoing_pending = Pending, + outgoing_delivery_id = OutgoingDeliveryId, + cfg = #cfg{outgoing_max_frame_size = MaxFrameSize, + vhost = Vhost, + user = User, + reader_pid = ReaderPid} + } = State0) -> + + IncomingLink0 = case maps:find(IncomingHandleInt, IncomingLinks) of + {ok, Link} -> + Link; + error -> + protocol_error(?V_1_0_AMQP_ERROR_ILLEGAL_STATE, + "Unknown link handle: ~p", [IncomingHandleInt]) + end, + #management_link{name = Name, + delivery_count = IncomingDeliveryCount0, + credit = 1, + max_message_size = IncomingMaxMessageSize + } = IncomingLink0, + #management_link_pair{ + incoming_half = IncomingHandleInt, + outgoing_half = OutgoingHandleInt + } = maps:get(Name, LinkPairs), + OutgoingLink0 = case OutgoingHandleInt of + unattached -> + protocol_error(?V_1_0_AMQP_ERROR_PRECONDITION_FAILED, + "received transfer on half open management link pair", []); + _ -> + maps:get(OutgoingHandleInt, OutgoingLinks) + end, + #management_link{name = Name, + delivery_count = OutgoingDeliveryCount, + credit = OutgoingCredit, + max_message_size = OutgoingMaxMessageSize} = OutgoingLink0, + case OutgoingCredit > 0 of + true -> + ok; + false -> + protocol_error(?V_1_0_AMQP_ERROR_PRECONDITION_FAILED, + "insufficient credit (~b) for management link from server to client", + [OutgoingCredit]) + end, + Settled = default(MaybeSettled, false), + validate_transfer_rcv_settle_mode(RcvSettleMode, Settled), + validate_message_size(Request, IncomingMaxMessageSize), + Response = rabbit_amqp_management:process_request(Request, Vhost, User, ReaderPid), + + Transfer = #'v1_0.transfer'{ + handle = ?UINT(OutgoingHandleInt), + delivery_id = ?UINT(OutgoingDeliveryId), + delivery_tag = {binary, <<>>}, + message_format = ?UINT(?MESSAGE_FORMAT), + settled = true}, + ?DEBUG("~s Outbound content:~n ~tp~n", + [?MODULE, [amqp10_framing:pprint(Section) || + Section <- amqp10_framing:decode_bin(iolist_to_binary(Respon))]]), + validate_message_size(Response, OutgoingMaxMessageSize), + Frames = transfer_frames(Transfer, Response, MaxFrameSize), + PendingTransfer = #pending_management_transfer{frames = Frames}, + IncomingDeliveryCount = add(IncomingDeliveryCount0, 1), + IncomingLink = IncomingLink0#management_link{delivery_count = IncomingDeliveryCount}, + OutgoingLink = OutgoingLink0#management_link{delivery_count = add(OutgoingDeliveryCount, 1), + credit = OutgoingCredit - 1}, + State = State0#state{ + outgoing_delivery_id = add(OutgoingDeliveryId, 1), + outgoing_pending = queue:in(PendingTransfer, Pending), + incoming_management_links = maps:update(IncomingHandleInt, IncomingLink, IncomingLinks), + outgoing_management_links = maps:update(OutgoingHandleInt, OutgoingLink, OutgoingLinks)}, + + Flow = flow(IncomingHandle, IncomingDeliveryCount, 1), + {[Flow], State}. + incoming_link_transfer( #'v1_0.transfer'{more = true, %% "The delivery-id MUST be supplied on the first transfer of a multi-transfer delivery." @@ -1581,6 +1809,7 @@ incoming_link_transfer( ok = validate_multi_transfer_settled(MaybeSettled, FirstSettled), {MsgBin0, FirstDeliveryId, FirstSettled} end, + validate_transfer_rcv_settle_mode(RcvSettleMode, Settled), validate_incoming_message_size(MsgBin), Sections = amqp10_framing:decode_bin(MsgBin), @@ -1601,12 +1830,6 @@ incoming_link_transfer( check_write_permitted_on_topic(Exchange, User, RoutingKey), QNames = rabbit_exchange:route(Exchange, Mc, #{return_binding_keys => true}), rabbit_trace:tap_in(Mc, QNames, ConnName, ChannelNum, Username, Trace), - case not Settled andalso - RcvSettleMode =:= ?V_1_0_RECEIVER_SETTLE_MODE_SECOND of - true -> protocol_error(?V_1_0_AMQP_ERROR_NOT_IMPLEMENTED, - "rcv-settle-mode second not supported", []); - false -> ok - end, Opts = #{correlation => {HandleInt, DeliveryId}}, Qs0 = rabbit_amqqueue:lookup_many(QNames), Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0), @@ -1671,7 +1894,7 @@ process_routing_confirm([_|_] = Qs, SenderSettles, DeliveryId, U0) -> {U, []}. released(DeliveryId) -> - #'v1_0.disposition'{role = ?RECV_ROLE, + #'v1_0.disposition'{role = ?AMQP_ROLE_RECEIVER, first = ?UINT(DeliveryId), settled = true, state = #'v1_0.released'{}}. @@ -1731,6 +1954,41 @@ ensure_target(#'v1_0.target'{address = Address, {error, {address_not_utf8_string, Address}} end. +handle_outgoing_mgmt_link_flow_control( + #management_link{delivery_count = DeliveryCountSnd} = Link0, + #'v1_0.flow'{handle = Handle = ?UINT(HandleInt), + delivery_count = MaybeDeliveryCountRcv, + link_credit = ?UINT(LinkCreditRcv), + drain = Drain0, + echo = Echo0}, + #state{outgoing_management_links = Links0, + outgoing_pending = Pending + } = State0) -> + Drain = default(Drain0, false), + Echo = default(Echo0, false), + DeliveryCountRcv = delivery_count_rcv(MaybeDeliveryCountRcv), + LinkCreditSnd = link_credit_snd(DeliveryCountRcv, LinkCreditRcv, DeliveryCountSnd), + {Count, Credit} = case Drain of + true -> {add(DeliveryCountSnd, LinkCreditSnd), 0}; + false -> {DeliveryCountSnd, LinkCreditSnd} + end, + State = case Echo orelse Drain of + true -> + Flow = #'v1_0.flow'{ + handle = Handle, + delivery_count = ?UINT(Count), + link_credit = ?UINT(Credit), + available = ?UINT(0), + drain = Drain}, + State0#state{outgoing_pending = queue:in(Flow, Pending)}; + false -> + State0 + end, + Link = Link0#management_link{delivery_count = Count, + credit = Credit}, + Links = maps:update(HandleInt, Link, Links0), + State#state{outgoing_management_links = Links}. + handle_outgoing_link_flow_control( #outgoing_link{queue_name_bin = QNameBin, delivery_count = MaybeDeliveryCountSnd}, @@ -1741,19 +1999,9 @@ handle_outgoing_link_flow_control( echo = Echo0}, State0 = #state{queue_states = QStates0, cfg = #cfg{vhost = Vhost}}) -> - DeliveryCountRcv = case MaybeDeliveryCountRcv of - ?UINT(Count) -> - Count; - undefined -> - %% "In the event that the receiver does not yet know the delivery-count, - %% i.e., delivery-countrcv is unspecified, the sender MUST assume that the - %% delivery-countrcv is the first delivery-countsnd sent from sender to - %% receiver, i.e., the delivery-countsnd specified in the flow state carried - %% by the initial attach frame from the sender to the receiver." [2.6.7] - ?INITIAL_DELIVERY_COUNT - end, - Ctag = handle_to_ctag(HandleInt), QName = rabbit_misc:r(Vhost, queue, QNameBin), + Ctag = handle_to_ctag(HandleInt), + DeliveryCountRcv = delivery_count_rcv(MaybeDeliveryCountRcv), Drain = default(Drain0, false), Echo = default(Echo0, false), case MaybeDeliveryCountSnd of @@ -1766,13 +2014,26 @@ handle_outgoing_link_flow_control( %% thanks to the queue event containing the consumer tag. State; {credit_api_v1, DeliveryCountSnd} -> - LinkCreditSnd = diff(add(DeliveryCountRcv, LinkCreditRcv), DeliveryCountSnd), + LinkCreditSnd = link_credit_snd(DeliveryCountRcv, LinkCreditRcv, DeliveryCountSnd), {ok, QStates, Actions} = rabbit_queue_type:credit_v1(QName, Ctag, LinkCreditSnd, Drain, QStates0), State1 = State0#state{queue_states = QStates}, State = handle_queue_actions(Actions, State1), process_credit_reply_sync(Ctag, QName, LinkCreditSnd, State) end. +delivery_count_rcv(?UINT(DeliveryCount)) -> + DeliveryCount; +delivery_count_rcv(undefined) -> + %% "In the event that the receiver does not yet know the delivery-count, + %% i.e., delivery-countrcv is unspecified, the sender MUST assume that the + %% delivery-countrcv is the first delivery-countsnd sent from sender to + %% receiver, i.e., the delivery-countsnd specified in the flow state carried + %% by the initial attach frame from the sender to the receiver." [2.6.7] + ?INITIAL_DELIVERY_COUNT. + +link_credit_snd(DeliveryCountRcv, LinkCreditRcv, DeliveryCountSnd) -> + diff(add(DeliveryCountRcv, LinkCreditRcv), DeliveryCountSnd). + %% The AMQP 0.9.1 credit extension was poorly designed because a consumer granting %% credits to a queue has to synchronously wait for a credit reply from the queue: %% https://github.com/rabbitmq/rabbitmq-server/blob/b9566f4d02f7ceddd2f267a92d46affd30fb16c8/deps/rabbitmq_codegen/credit_extension.json#L43 @@ -1892,6 +2153,13 @@ ensure_source(#'v1_0.source'{address = Address, {error, {address_not_utf8_string, Address}} end. +transfer_frames(Transfer, Sections, unlimited) -> + [[Transfer, Sections]]; +transfer_frames(Transfer, Sections, MaxFrameSize) -> + %% TODO Ugh + TLen = iolist_size(amqp10_framing:encode_bin(Transfer)), + encode_frames(Transfer, Sections, MaxFrameSize - TLen, []). + encode_frames(_T, _Msg, MaxContentLen, _Transfers) when MaxContentLen =< 0 -> protocol_error(?V_1_0_AMQP_ERROR_FRAME_SIZE_TOO_SMALL, "Frame size is too small by ~tp bytes", @@ -2017,6 +2285,14 @@ validate_multi_transfer_settled(Other, First) "(interpreted) field 'settled' on first transfer (~p)", [Other, First]). +%% "If the message is being sent settled by the sender, +%% the value of this field [rcv-settle-mode] is ignored." [2.7.5] +validate_transfer_rcv_settle_mode(?V_1_0_RECEIVER_SETTLE_MODE_SECOND, _Settled = false) -> + protocol_error(?V_1_0_AMQP_ERROR_NOT_IMPLEMENTED, + "rcv-settle-mode second not supported", []); +validate_transfer_rcv_settle_mode(_, _) -> + ok. + validate_incoming_message_size(Message) -> validate_message_size(Message, persistent_term:get(max_message_size)). @@ -2146,14 +2422,6 @@ queue_is_durable(undefined) -> %% [3.5.3] queue_is_durable(?V_1_0_TERMINUS_DURABILITY_NONE). -%% "The two endpoints are not REQUIRED to use the same handle. This means a peer -%% is free to independently chose its handle when a link endpoint is associated -%% with the session. The locally chosen handle is referred to as the output handle. -%% The remotely chosen handle is referred to as the input handle." [2.6.2] -%% For simplicity, we choose to use the same handle. -output_handle(InputHandle) -> - _Outputhandle = InputHandle. - -spec remove_link_from_outgoing_unsettled_map(link_handle() | rabbit_types:ctag(), Map) -> {Map, [rabbit_amqqueue:msg_id()]} when Map :: #{delivery_number() => #outgoing_unsettled{}}. @@ -2296,6 +2564,33 @@ check_user_id(Mc, User) -> protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, Reason, Args) end. +max_message_size({ulong, Size}) + when Size > 0 -> + Size; +max_message_size(_) -> + %% "If this field is zero or unset, there is no + %% maximum size imposed by the link endpoint." + unlimited. + +check_paired({map, Properties}) -> + case lists:any(fun({{symbol, <<"paired">>}, true}) -> + true; + (_) -> + false + end, Properties) of + true -> + ok; + false -> + property_paired_not_set() + end; +check_paired(_) -> + property_paired_not_set(). + +-spec property_paired_not_set() -> no_return(). +property_paired_not_set() -> + protocol_error(?V_1_0_AMQP_ERROR_INVALID_FIELD, + "Link property 'paired' is not set to boolean value 'true'", []). + format_status( #{state := #state{cfg = Cfg, outgoing_pending = OutgoingPending, diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index bb5268450d7e..005c92afc6b3 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -231,7 +231,7 @@ server_properties(Protocol) -> NormalizedConfigServerProps = [{<<"capabilities">>, table, server_capabilities(Protocol)} | [case X of - {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)), + {KeyAtom, Value} -> {atom_to_binary(KeyAtom), longstr, maybe_list_to_binary(Value)}; {BinKey, Type, Value} -> {BinKey, Type, Value} diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index c450ca06e346..43c6a135a173 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -1126,7 +1126,7 @@ events(Config) -> Pid = proplists:lookup(pid, Props), ClientProperties = {client_properties, List} = proplists:lookup(client_properties, Props), ?assert(lists:member( - {<<"product">>, longstr, <<"AMQP 1.0 client from the RabbitMQ Project">>}, + {<<"product">>, longstr, <<"AMQP 1.0 client">>}, List)), ?assert(lists:member( {<<"ignore-maintenance">>, bool, true}, diff --git a/deps/rabbitmq_amqp_client/.gitignore b/deps/rabbitmq_amqp_client/.gitignore new file mode 100644 index 000000000000..0de8bdab4c4f --- /dev/null +++ b/deps/rabbitmq_amqp_client/.gitignore @@ -0,0 +1,17 @@ +.sw? +.*.sw? +*.beam +/.erlang.mk/ +/cover/ +/deps/ +/doc/ +/ebin/ +/escript/ +/escript.lock +/logs/ +/plugins/ +/plugins.lock +/sbin/ +/sbin.lock + +/rabbitmq_amqp_client.d diff --git a/deps/rabbitmq_amqp_client/BUILD.bazel b/deps/rabbitmq_amqp_client/BUILD.bazel new file mode 100644 index 000000000000..19a9a3a122c4 --- /dev/null +++ b/deps/rabbitmq_amqp_client/BUILD.bazel @@ -0,0 +1,90 @@ +load("@rules_erlang//:eunit2.bzl", "eunit") +load("@rules_erlang//:xref2.bzl", "xref") +load("@rules_erlang//:dialyze.bzl", "dialyze", "plt") +load( + "//:rabbitmq.bzl", + "RABBITMQ_DIALYZER_OPTS", + "assert_suites", + "broker_for_integration_suites", + "rabbitmq_app", + "rabbitmq_integration_suite", + "rabbitmq_suite", +) +load( + ":app.bzl", + "all_beam_files", + "all_srcs", + "all_test_beam_files", + "test_suite_beam_files", +) + +APP_NAME = "rabbitmq_amqp_client" + +APP_DESCRIPTION = "AMQP 1.0 client for RabbitMQ" + +all_beam_files(name = "all_beam_files") + +all_test_beam_files(name = "all_test_beam_files") + +all_srcs(name = "all_srcs") + +test_suite_beam_files(name = "test_suite_beam_files") + +rabbitmq_app( + name = "erlang_app", + srcs = [":all_srcs"], + hdrs = [":public_hdrs"], + app_description = APP_DESCRIPTION, + app_name = APP_NAME, + beam_files = [":beam_files"], + license_files = [":license_files"], + priv = [":priv"], + deps = [ + "//deps/amqp10_client:erlang_app", + "//deps/amqp10_common:erlang_app", + ], +) + +xref( + name = "xref", + target = ":erlang_app", +) + +plt( + name = "deps_plt", + for_target = ":erlang_app", + plt = "//:base_plt", +) + +dialyze( + name = "dialyze", + dialyzer_opts = RABBITMQ_DIALYZER_OPTS, + plt = ":deps_plt", + target = ":erlang_app", +) + +broker_for_integration_suites( +) + +TEST_DEPS = [ + "//deps/amqp10_client:erlang_app", +] + +rabbitmq_integration_suite( + name = "management_SUITE", + size = "medium", + deps = TEST_DEPS, +) + +assert_suites() + +alias( + name = "rabbitmq_amqp_client", + actual = ":erlang_app", + visibility = ["//visibility:public"], +) + +eunit( + name = "eunit", + target = ":test_erlang_app", +) diff --git a/deps/rabbitmq_amqp_client/LICENSE b/deps/rabbitmq_amqp_client/LICENSE new file mode 100644 index 000000000000..1699234a3e89 --- /dev/null +++ b/deps/rabbitmq_amqp_client/LICENSE @@ -0,0 +1,4 @@ +This package is licensed under the MPL 2.0. For the MPL 2.0, please see LICENSE-MPL-RabbitMQ. + +If you have any questions regarding licensing, please contact us at +rabbitmq-core@groups.vmware.com. diff --git a/deps/rabbitmq_amqp_client/LICENSE-MPL-RabbitMQ b/deps/rabbitmq_amqp_client/LICENSE-MPL-RabbitMQ new file mode 100644 index 000000000000..14e2f777f6c3 --- /dev/null +++ b/deps/rabbitmq_amqp_client/LICENSE-MPL-RabbitMQ @@ -0,0 +1,373 @@ +Mozilla Public License Version 2.0 +================================== + +1. Definitions +-------------- + +1.1. "Contributor" + means each individual or legal entity that creates, contributes to + the creation of, or owns Covered Software. + +1.2. "Contributor Version" + means the combination of the Contributions of others (if any) used + by a Contributor and that particular Contributor's Contribution. + +1.3. "Contribution" + means Covered Software of a particular Contributor. + +1.4. "Covered Software" + means Source Code Form to which the initial Contributor has attached + the notice in Exhibit A, the Executable Form of such Source Code + Form, and Modifications of such Source Code Form, in each case + including portions thereof. + +1.5. "Incompatible With Secondary Licenses" + means + + (a) that the initial Contributor has attached the notice described + in Exhibit B to the Covered Software; or + + (b) that the Covered Software was made available under the terms of + version 1.1 or earlier of the License, but not also under the + terms of a Secondary License. + +1.6. "Executable Form" + means any form of the work other than Source Code Form. + +1.7. "Larger Work" + means a work that combines Covered Software with other material, in + a separate file or files, that is not Covered Software. + +1.8. "License" + means this document. + +1.9. "Licensable" + means having the right to grant, to the maximum extent possible, + whether at the time of the initial grant or subsequently, any and + all of the rights conveyed by this License. + +1.10. "Modifications" + means any of the following: + + (a) any file in Source Code Form that results from an addition to, + deletion from, or modification of the contents of Covered + Software; or + + (b) any new file in Source Code Form that contains any Covered + Software. + +1.11. "Patent Claims" of a Contributor + means any patent claim(s), including without limitation, method, + process, and apparatus claims, in any patent Licensable by such + Contributor that would be infringed, but for the grant of the + License, by the making, using, selling, offering for sale, having + made, import, or transfer of either its Contributions or its + Contributor Version. + +1.12. "Secondary License" + means either the GNU General Public License, Version 2.0, the GNU + Lesser General Public License, Version 2.1, the GNU Affero General + Public License, Version 3.0, or any later versions of those + licenses. + +1.13. "Source Code Form" + means the form of the work preferred for making modifications. + +1.14. "You" (or "Your") + means an individual or a legal entity exercising rights under this + License. For legal entities, "You" includes any entity that + controls, is controlled by, or is under common control with You. For + purposes of this definition, "control" means (a) the power, direct + or indirect, to cause the direction or management of such entity, + whether by contract or otherwise, or (b) ownership of more than + fifty percent (50%) of the outstanding shares or beneficial + ownership of such entity. + +2. License Grants and Conditions +-------------------------------- + +2.1. Grants + +Each Contributor hereby grants You a world-wide, royalty-free, +non-exclusive license: + +(a) under intellectual property rights (other than patent or trademark) + Licensable by such Contributor to use, reproduce, make available, + modify, display, perform, distribute, and otherwise exploit its + Contributions, either on an unmodified basis, with Modifications, or + as part of a Larger Work; and + +(b) under Patent Claims of such Contributor to make, use, sell, offer + for sale, have made, import, and otherwise transfer either its + Contributions or its Contributor Version. + +2.2. Effective Date + +The licenses granted in Section 2.1 with respect to any Contribution +become effective for each Contribution on the date the Contributor first +distributes such Contribution. + +2.3. Limitations on Grant Scope + +The licenses granted in this Section 2 are the only rights granted under +this License. No additional rights or licenses will be implied from the +distribution or licensing of Covered Software under this License. +Notwithstanding Section 2.1(b) above, no patent license is granted by a +Contributor: + +(a) for any code that a Contributor has removed from Covered Software; + or + +(b) for infringements caused by: (i) Your and any other third party's + modifications of Covered Software, or (ii) the combination of its + Contributions with other software (except as part of its Contributor + Version); or + +(c) under Patent Claims infringed by Covered Software in the absence of + its Contributions. + +This License does not grant any rights in the trademarks, service marks, +or logos of any Contributor (except as may be necessary to comply with +the notice requirements in Section 3.4). + +2.4. Subsequent Licenses + +No Contributor makes additional grants as a result of Your choice to +distribute the Covered Software under a subsequent version of this +License (see Section 10.2) or under the terms of a Secondary License (if +permitted under the terms of Section 3.3). + +2.5. Representation + +Each Contributor represents that the Contributor believes its +Contributions are its original creation(s) or it has sufficient rights +to grant the rights to its Contributions conveyed by this License. + +2.6. Fair Use + +This License is not intended to limit any rights You have under +applicable copyright doctrines of fair use, fair dealing, or other +equivalents. + +2.7. Conditions + +Sections 3.1, 3.2, 3.3, and 3.4 are conditions of the licenses granted +in Section 2.1. + +3. Responsibilities +------------------- + +3.1. Distribution of Source Form + +All distribution of Covered Software in Source Code Form, including any +Modifications that You create or to which You contribute, must be under +the terms of this License. You must inform recipients that the Source +Code Form of the Covered Software is governed by the terms of this +License, and how they can obtain a copy of this License. You may not +attempt to alter or restrict the recipients' rights in the Source Code +Form. + +3.2. Distribution of Executable Form + +If You distribute Covered Software in Executable Form then: + +(a) such Covered Software must also be made available in Source Code + Form, as described in Section 3.1, and You must inform recipients of + the Executable Form how they can obtain a copy of such Source Code + Form by reasonable means in a timely manner, at a charge no more + than the cost of distribution to the recipient; and + +(b) You may distribute such Executable Form under the terms of this + License, or sublicense it under different terms, provided that the + license for the Executable Form does not attempt to limit or alter + the recipients' rights in the Source Code Form under this License. + +3.3. Distribution of a Larger Work + +You may create and distribute a Larger Work under terms of Your choice, +provided that You also comply with the requirements of this License for +the Covered Software. If the Larger Work is a combination of Covered +Software with a work governed by one or more Secondary Licenses, and the +Covered Software is not Incompatible With Secondary Licenses, this +License permits You to additionally distribute such Covered Software +under the terms of such Secondary License(s), so that the recipient of +the Larger Work may, at their option, further distribute the Covered +Software under the terms of either this License or such Secondary +License(s). + +3.4. Notices + +You may not remove or alter the substance of any license notices +(including copyright notices, patent notices, disclaimers of warranty, +or limitations of liability) contained within the Source Code Form of +the Covered Software, except that You may alter any license notices to +the extent required to remedy known factual inaccuracies. + +3.5. Application of Additional Terms + +You may choose to offer, and to charge a fee for, warranty, support, +indemnity or liability obligations to one or more recipients of Covered +Software. However, You may do so only on Your own behalf, and not on +behalf of any Contributor. You must make it absolutely clear that any +such warranty, support, indemnity, or liability obligation is offered by +You alone, and You hereby agree to indemnify every Contributor for any +liability incurred by such Contributor as a result of warranty, support, +indemnity or liability terms You offer. You may include additional +disclaimers of warranty and limitations of liability specific to any +jurisdiction. + +4. Inability to Comply Due to Statute or Regulation +--------------------------------------------------- + +If it is impossible for You to comply with any of the terms of this +License with respect to some or all of the Covered Software due to +statute, judicial order, or regulation then You must: (a) comply with +the terms of this License to the maximum extent possible; and (b) +describe the limitations and the code they affect. Such description must +be placed in a text file included with all distributions of the Covered +Software under this License. Except to the extent prohibited by statute +or regulation, such description must be sufficiently detailed for a +recipient of ordinary skill to be able to understand it. + +5. Termination +-------------- + +5.1. The rights granted under this License will terminate automatically +if You fail to comply with any of its terms. However, if You become +compliant, then the rights granted under this License from a particular +Contributor are reinstated (a) provisionally, unless and until such +Contributor explicitly and finally terminates Your grants, and (b) on an +ongoing basis, if such Contributor fails to notify You of the +non-compliance by some reasonable means prior to 60 days after You have +come back into compliance. Moreover, Your grants from a particular +Contributor are reinstated on an ongoing basis if such Contributor +notifies You of the non-compliance by some reasonable means, this is the +first time You have received notice of non-compliance with this License +from such Contributor, and You become compliant prior to 30 days after +Your receipt of the notice. + +5.2. If You initiate litigation against any entity by asserting a patent +infringement claim (excluding declaratory judgment actions, +counter-claims, and cross-claims) alleging that a Contributor Version +directly or indirectly infringes any patent, then the rights granted to +You by any and all Contributors for the Covered Software under Section +2.1 of this License shall terminate. + +5.3. In the event of termination under Sections 5.1 or 5.2 above, all +end user license agreements (excluding distributors and resellers) which +have been validly granted by You or Your distributors under this License +prior to termination shall survive termination. + +************************************************************************ +* * +* 6. Disclaimer of Warranty * +* ------------------------- * +* * +* Covered Software is provided under this License on an "as is" * +* basis, without warranty of any kind, either expressed, implied, or * +* statutory, including, without limitation, warranties that the * +* Covered Software is free of defects, merchantable, fit for a * +* particular purpose or non-infringing. The entire risk as to the * +* quality and performance of the Covered Software is with You. * +* Should any Covered Software prove defective in any respect, You * +* (not any Contributor) assume the cost of any necessary servicing, * +* repair, or correction. This disclaimer of warranty constitutes an * +* essential part of this License. No use of any Covered Software is * +* authorized under this License except under this disclaimer. * +* * +************************************************************************ + +************************************************************************ +* * +* 7. Limitation of Liability * +* -------------------------- * +* * +* Under no circumstances and under no legal theory, whether tort * +* (including negligence), contract, or otherwise, shall any * +* Contributor, or anyone who distributes Covered Software as * +* permitted above, be liable to You for any direct, indirect, * +* special, incidental, or consequential damages of any character * +* including, without limitation, damages for lost profits, loss of * +* goodwill, work stoppage, computer failure or malfunction, or any * +* and all other commercial damages or losses, even if such party * +* shall have been informed of the possibility of such damages. This * +* limitation of liability shall not apply to liability for death or * +* personal injury resulting from such party's negligence to the * +* extent applicable law prohibits such limitation. Some * +* jurisdictions do not allow the exclusion or limitation of * +* incidental or consequential damages, so this exclusion and * +* limitation may not apply to You. * +* * +************************************************************************ + +8. Litigation +------------- + +Any litigation relating to this License may be brought only in the +courts of a jurisdiction where the defendant maintains its principal +place of business and such litigation shall be governed by laws of that +jurisdiction, without reference to its conflict-of-law provisions. +Nothing in this Section shall prevent a party's ability to bring +cross-claims or counter-claims. + +9. Miscellaneous +---------------- + +This License represents the complete agreement concerning the subject +matter hereof. If any provision of this License is held to be +unenforceable, such provision shall be reformed only to the extent +necessary to make it enforceable. Any law or regulation which provides +that the language of a contract shall be construed against the drafter +shall not be used to construe this License against a Contributor. + +10. Versions of the License +--------------------------- + +10.1. New Versions + +Mozilla Foundation is the license steward. Except as provided in Section +10.3, no one other than the license steward has the right to modify or +publish new versions of this License. Each version will be given a +distinguishing version number. + +10.2. Effect of New Versions + +You may distribute the Covered Software under the terms of the version +of the License under which You originally received the Covered Software, +or under the terms of any subsequent version published by the license +steward. + +10.3. Modified Versions + +If you create software not governed by this License, and you want to +create a new license for such software, you may create and use a +modified version of this License if you rename the license and remove +any references to the name of the license steward (except to note that +such modified license differs from this License). + +10.4. Distributing Source Code Form that is Incompatible With Secondary +Licenses + +If You choose to distribute Source Code Form that is Incompatible With +Secondary Licenses under the terms of this version of the License, the +notice described in Exhibit B of this License must be attached. + +Exhibit A - Source Code Form License Notice +------------------------------------------- + + This Source Code Form is subject to the terms of the Mozilla Public + License, v. 2.0. If a copy of the MPL was not distributed with this + file, You can obtain one at http://mozilla.org/MPL/2.0/. + +If it is not possible or desirable to put the notice in a particular +file, then You may include the notice in a location (such as a LICENSE +file in a relevant directory) where a recipient would be likely to look +for such a notice. + +You may add additional accurate notices of copyright ownership. + +Exhibit B - "Incompatible With Secondary Licenses" Notice +--------------------------------------------------------- + + This Source Code Form is "Incompatible With Secondary Licenses", as + defined by the Mozilla Public License, v. 2.0. diff --git a/deps/rabbitmq_amqp_client/Makefile b/deps/rabbitmq_amqp_client/Makefile new file mode 100644 index 000000000000..99ec4850555b --- /dev/null +++ b/deps/rabbitmq_amqp_client/Makefile @@ -0,0 +1,22 @@ +PROJECT = rabbitmq_amqp_client +PROJECT_DESCRIPTION = AMQP 1.0 client for RabbitMQ + +DEPS = amqp10_client +TEST_DEPS = rabbitmq_ct_helpers + +BUILD_DEPS = rabbit_common +DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk +TEST_DEPS = rabbit rabbitmq_ct_helpers + +DEP_PLUGINS = rabbit_common/mk/rabbitmq-macros.mk \ + rabbit_common/mk/rabbitmq-build.mk \ + rabbit_common/mk/rabbitmq-hexpm.mk \ + rabbit_common/mk/rabbitmq-dist.mk \ + rabbit_common/mk/rabbitmq-run.mk \ + rabbit_common/mk/rabbitmq-test.mk \ + rabbit_common/mk/rabbitmq-tools.mk + +.DEFAULT_GOAL = all + +include rabbitmq-components.mk +include erlang.mk diff --git a/deps/rabbitmq_amqp_client/README.md b/deps/rabbitmq_amqp_client/README.md new file mode 100644 index 000000000000..a9f26265a1fe --- /dev/null +++ b/deps/rabbitmq_amqp_client/README.md @@ -0,0 +1,22 @@ +# Erlang RabbitMQ AMQP 1.0 Client + +The [Erlang AMQP 1.0 client](../amqp10_client/) is a client that can communicate with any AMQP 1.0 broker. +In contrast, this project (Erlang **RabbitMQ** AMQP 1.0 Client) can only communicate with RabbitMQ. +This project wraps the Erlang AMQP 1.0 client providing the following RabbitMQ management operations: +* declare queue +* delete queue +* purge queue +* bind queue to exchange +* unbind queue from exchange +* declare exchange +* delete exchange +* bind exchange to exchange +* unbind exchange from exchange + +These management operations are defined in the [AMQP 0.9.1 protocol](https://www.rabbitmq.com/amqp-0-9-1-reference.html). +To support these AMQP 0.9.1 / RabbitMQ specific operations over AMQP 1.0, this project implements a subset of the following (most recent) AMQP 1.0 extension specifications: +* [AMQP Management Version 1.0 - Working Draft 16](https://www.oasis-open.org/committees/download.php/65575/amqp-man-v1.0-wd16.docx) (July 2019) +* [HTTP Semantics and Content over AMQP Version 1.0 - Working Draft 06](https://www.oasis-open.org/committees/download.php/65571/http-over-amqp-v1.0-wd06a.docx) (July 2019) +* [AMQP Request-Response Messaging with Link Pairing Version 1.0 - Committee Specification 01](https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html) (February 2021) + +This project might support more (non AMQP 0.9.1) RabbitMQ operations via AMQP 1.0 in the future. diff --git a/deps/rabbitmq_amqp_client/app.bzl b/deps/rabbitmq_amqp_client/app.bzl new file mode 100644 index 000000000000..826d7770f96e --- /dev/null +++ b/deps/rabbitmq_amqp_client/app.bzl @@ -0,0 +1,67 @@ +load("@rules_erlang//:erlang_bytecode2.bzl", "erlang_bytecode") +load("@rules_erlang//:filegroup.bzl", "filegroup") + +def all_beam_files(name = "all_beam_files"): + filegroup( + name = "beam_files", + srcs = [":other_beam"], + ) + erlang_bytecode( + name = "other_beam", + srcs = ["src/rabbitmq_amqp_client.erl"], + hdrs = [":public_and_private_hdrs"], + app_name = "rabbitmq_amqp_client", + dest = "ebin", + erlc_opts = "//:erlc_opts", + ) + +def all_srcs(name = "all_srcs"): + filegroup( + name = "srcs", + srcs = ["src/rabbitmq_amqp_client.erl"], + ) + filegroup(name = "private_hdrs") + filegroup(name = "public_hdrs") + filegroup(name = "priv") + filegroup( + name = "license_files", + srcs = [ + "LICENSE", + "LICENSE-MPL-RabbitMQ", + ], + ) + filegroup( + name = "public_and_private_hdrs", + srcs = [":private_hdrs", ":public_hdrs"], + ) + filegroup( + name = "all_srcs", + srcs = [":public_and_private_hdrs", ":srcs"], + ) + +def all_test_beam_files(name = "all_test_beam_files"): + erlang_bytecode( + name = "test_other_beam", + testonly = True, + srcs = ["src/rabbitmq_amqp_client.erl"], + hdrs = [":public_and_private_hdrs"], + app_name = "rabbitmq_amqp_client", + dest = "test", + erlc_opts = "//:test_erlc_opts", + ) + filegroup( + name = "test_beam_files", + testonly = True, + srcs = [":test_other_beam"], + ) + +def test_suite_beam_files(name = "test_suite_beam_files"): + erlang_bytecode( + name = "management_SUITE_beam_files", + testonly = True, + srcs = ["test/management_SUITE.erl"], + outs = ["test/management_SUITE.beam"], + app_name = "rabbitmq_amqp_client", + erlc_opts = "//:test_erlc_opts", + deps = ["//deps/amqp10_common:erlang_app"], + ) diff --git a/deps/rabbitmq_amqp_client/erlang.mk b/deps/rabbitmq_amqp_client/erlang.mk new file mode 120000 index 000000000000..59af4a527a9d --- /dev/null +++ b/deps/rabbitmq_amqp_client/erlang.mk @@ -0,0 +1 @@ +../../erlang.mk \ No newline at end of file diff --git a/deps/rabbitmq_amqp_client/rabbitmq-components.mk b/deps/rabbitmq_amqp_client/rabbitmq-components.mk new file mode 120000 index 000000000000..43c0d3567154 --- /dev/null +++ b/deps/rabbitmq_amqp_client/rabbitmq-components.mk @@ -0,0 +1 @@ +../../rabbitmq-components.mk \ No newline at end of file diff --git a/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl new file mode 100644 index 000000000000..ac821de8b79b --- /dev/null +++ b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl @@ -0,0 +1,201 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +-module(rabbitmq_amqp_client). + +-feature(maybe_expr, enable). + +-export[attach_management_link_pair_sync/2, + declare_queue/2, + purge_queue/2, + delete_queue/2]. + +-define(TIMEOUT, 10_000). +-define(MANAGEMENT_NODE_ADDRESS, <<"$management">>). + +-record(link_pair, {outgoing_link :: amqp10_client:link_ref(), + incoming_link :: amqp10_client:link_ref()}). +-type link_pair() :: #link_pair{}. + +-type x_args() :: #{binary() => {atom(), term()}}. + +-type queue_properties() :: #{name => binary(), + durable => boolean(), + exclusive => boolean(), + auto_delete => boolean(), + arguments => x_args()}. + +-export_type([link_pair/0]). + +-spec attach_management_link_pair_sync(pid(), binary()) -> + {ok, link_pair()} | {error, term()}. +attach_management_link_pair_sync(Session, Name) -> + Terminus = #{address => ?MANAGEMENT_NODE_ADDRESS, + durable => none}, + OutgoingAttachArgs = #{name => Name, + role => {sender, Terminus}, + snd_settle_mode => settled, + rcv_settle_mode => first, + properties => #{<<"paired">> => true}}, + IncomingAttachArgs = OutgoingAttachArgs#{role := {receiver, Terminus, self()}, + filter => #{}}, + maybe + {ok, OutgoingRef} ?= attach(Session, OutgoingAttachArgs), + {ok, IncomingRef} ?= attach(Session, IncomingAttachArgs), + ok ?= await_attached(OutgoingRef), + ok ?= await_attached(IncomingRef), + {ok, #link_pair{outgoing_link = OutgoingRef, + incoming_link = IncomingRef}} + end. + +-spec attach(pid(), amqp10_client:attach_args()) -> + {ok, amqp10_client:link_ref()} | {error, term()}. +attach(Session, AttachArgs) -> + try amqp10_client:attach_link(Session, AttachArgs) + catch exit:Reason -> + {error, Reason} + end. + +-spec await_attached(amqp10_client:link_ref()) -> + ok | {error, term()}. +await_attached(Ref) -> + receive + {amqp10_event, {link, Ref, attached}} -> + ok; + {amqp10_event, {link, Ref, {detached, Err}}} -> + {error, Err} + after ?TIMEOUT -> + {error, timeout} + end. + +-spec declare_queue(link_pair(), queue_properties()) -> + {ok, map()} | {error, term()}. +declare_queue(#link_pair{outgoing_link = OutgoingLink, + incoming_link = IncomingLink}, + QueueProperties) -> + Body0 = maps:fold( + fun(name, V, Acc) when is_binary(V) -> + [{{utf8, <<"name">>}, {utf8, V}} | Acc]; + (durable, V, Acc) when is_boolean(V) -> + [{{utf8, <<"durable">>}, {boolean, V}} | Acc]; + (exclusive, V, Acc) when is_boolean(V) -> + [{{utf8, <<"exclusive">>}, {boolean, V}} | Acc]; + (auto_delete, V, Acc) when is_boolean(V) -> + [{{utf8, <<"auto_delete">>}, {boolean, V}} | Acc]; + (arguments, V, Acc) -> + KVList = maps:fold( + fun(K = <<"x-", _/binary>>, TaggedVal = {T, _}, L) + when is_atom(T) -> + [{{utf8, K}, TaggedVal} | L] + end, [], V), + [{{utf8, <<"arguments">>}, {map, KVList}} | Acc] + end, [{{utf8, <<"type">>}, {utf8, <<"queue">>}}], QueueProperties), + Body1 = {map, Body0}, + Body = iolist_to_binary(amqp10_framing:encode_bin(Body1)), + + MessageId = message_id(), + HttpMethod = <<"POST">>, + HttpRequestTarget = <<"/$management/entities">>, + ContentType = <<"application/amqp-management+amqp;type=entity">>, + Props = #{message_id => {binary, MessageId}, + to => HttpRequestTarget, + subject => HttpMethod, + reply_to => <<"$me">>, + content_type => ContentType}, + Req0 = amqp10_msg:new(<<>>, Body, true), + Req = amqp10_msg:set_properties(Props, Req0), + + ok = amqp10_client:flow_link_credit(IncomingLink, 1, never), + maybe + ok ?= amqp10_client:send_msg(OutgoingLink, Req), + {ok, Resp} ?= receive {amqp10_msg, IncomingLink, Message} -> {ok, Message} + after ?TIMEOUT -> {error, response_timeout} + end, + #{correlation_id := MessageId, + subject := <<"201">>, + content_type := <<"application/amqp-management+amqp;type=entity-collection">> + } = amqp10_msg:properties(Resp), + #{<<"http:response">> := <<"1.1">>, + <<"location">> := _QueueURI + } = amqp10_msg:application_properties(Resp), + RespBody = amqp10_msg:body_bin(Resp), + [{map, KVList}] = amqp10_framing:decode_bin(RespBody), + {ok, proplists:to_map(KVList)} + end. + +-spec purge_queue(link_pair(), binary()) -> + {ok, map()} | {error, term()}. +purge_queue(#link_pair{outgoing_link = OutgoingLink, + incoming_link = IncomingLink}, + QueueName) -> + MessageId = message_id(), + HttpMethod = <<"POST">>, + HttpRequestTarget = <<"/$management/queues/", QueueName/binary, "/$management/purge">>, + Props = #{message_id => {binary, MessageId}, + to => HttpRequestTarget, + subject => HttpMethod, + reply_to => <<"$me">>}, + Req0 = amqp10_msg:new(<<>>, <<>>, true), + Req = amqp10_msg:set_properties(Props, Req0), + + ok = amqp10_client:flow_link_credit(IncomingLink, 1, never), + maybe + ok ?= amqp10_client:send_msg(OutgoingLink, Req), + {ok, Resp} ?= receive {amqp10_msg, IncomingLink, Message} -> {ok, Message} + after ?TIMEOUT -> {error, response_timeout} + end, + #{correlation_id := MessageId, + subject := <<"200">>, + content_type := <<"application/amqp-management+amqp">> + } = amqp10_msg:properties(Resp), + #{<<"http:response">> := <<"1.1">> } = amqp10_msg:application_properties(Resp), + RespBody = amqp10_msg:body_bin(Resp), + [{map, [ + {{utf8, <<"message_count">>}, {ulong, Count}} + ] + }] = amqp10_framing:decode_bin(RespBody), + {ok, #{message_count => Count}} + end. + +-spec delete_queue(link_pair(), binary()) -> + {ok, map()} | {error, term()}. +delete_queue(#link_pair{outgoing_link = OutgoingLink, + incoming_link = IncomingLink}, + QueueName) -> + MessageId = message_id(), + HttpMethod = <<"DELETE">>, + HttpRequestTarget = <<"/$management/queues/", QueueName/binary>>, + Props = #{message_id => {binary, MessageId}, + to => HttpRequestTarget, + subject => HttpMethod, + reply_to => <<"$me">>}, + Req0 = amqp10_msg:new(<<>>, <<>>, true), + Req = amqp10_msg:set_properties(Props, Req0), + + ok = amqp10_client:flow_link_credit(IncomingLink, 1, never), + maybe + ok ?= amqp10_client:send_msg(OutgoingLink, Req), + {ok, Resp} ?= receive {amqp10_msg, IncomingLink, Message} -> {ok, Message} + after ?TIMEOUT -> {error, response_timeout} + end, + #{correlation_id := MessageId, + subject := <<"200">>, + content_type := <<"application/amqp-management+amqp">> + } = amqp10_msg:properties(Resp), + #{<<"http:response">> := <<"1.1">> } = amqp10_msg:application_properties(Resp), + RespBody = amqp10_msg:body_bin(Resp), + [{map, [ + {{utf8, <<"message_count">>}, {ulong, Count}} + ] + }] = amqp10_framing:decode_bin(RespBody), + {ok, #{message_count => Count}} + end. + +%% "The message producer is usually responsible for setting the message-id in +%% such a way that it is assured to be globally unique." [3.2.4] +-spec message_id() -> binary(). +message_id() -> + rand:bytes(8). diff --git a/deps/rabbitmq_amqp_client/test/management_SUITE.erl b/deps/rabbitmq_amqp_client/test/management_SUITE.erl new file mode 100644 index 000000000000..4d041bf1e94e --- /dev/null +++ b/deps/rabbitmq_amqp_client/test/management_SUITE.erl @@ -0,0 +1,131 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(management_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-include_lib("amqp10_common/include/amqp10_framing.hrl"). + +-compile([export_all, + nowarn_export_all]). + +suite() -> + [{timetrap, {minutes, 3}}]. + + +all() -> + [{group, tests}]. + +groups() -> + [ + {tests, [], [queue]} + ]. + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(amqp10_client), + rabbit_ct_helpers:log_environment(), + Config. + +end_per_suite(Config) -> + Config. + +init_per_group(_Group, Config) -> + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config( + Config, [{rmq_nodes_count, 1}, + {rmq_nodename_suffix, Suffix}]), + rabbit_ct_helpers:run_setup_steps( + Config1, + rabbit_ct_broker_helpers:setup_steps()). + +end_per_group(_, Config) -> + rabbit_ct_helpers:run_teardown_steps( + Config, + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +queue(Config) -> + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + receive {amqp10_event, {connection, C, opened}} + when C =:= Connection -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync( + Session, <<"my-link-pair">>), + + QName = <<"q1">>, + Q = #{name => QName, + durable => true, + exclusive => false, + auto_delete => false, + arguments => #{<<"x-queue-type">> => {symbol, <<"quorum">>}}}, + {ok, #{{utf8, <<"target">>} := {utf8, Address}} } = rabbitmq_amqp_client:declare_queue(LinkPair, Q), + + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"test-sender">>, Address), + ok = wait_for_credit(Sender), + flush(credited), + DTag = <<"tag 1">>, + Msg1 = amqp10_msg:new(DTag, <<"m1">>, false), + ok = amqp10_client:send_msg(Sender, Msg1), + ok = wait_for_accepted(DTag), + + ?assertEqual({ok, #{message_count => 1}}, + rabbitmq_amqp_client:purge_queue(LinkPair, QName)), + + ?assertEqual({ok, #{message_count => 0}}, + rabbitmq_amqp_client:delete_queue(LinkPair, QName)), + + ok = amqp10_client:end_session(Session), + ok = amqp10_client:close_connection(Connection). + +connection_config(Config) -> + Host = ?config(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + #{address => Host, + port => Port, + container_id => <<"my container">>, + sasl => {plain, <<"guest">>, <<"guest">>}}. + +wait_for_credit(Sender) -> + receive + {amqp10_event, {link, Sender, credited}} -> + ok + after 5000 -> + flush(?FUNCTION_NAME), + ct:fail(?FUNCTION_NAME) + end. + +flush(Prefix) -> + receive + Msg -> + ct:pal("~p flushed: ~p~n", [Prefix, Msg]), + flush(Prefix) + after 1 -> + ok + end. + +wait_for_accepted(Tag) -> + wait_for_settlement(Tag, accepted). + +wait_for_settlement(Tag, State) -> + receive + {amqp10_disposition, {State, Tag}} -> + ok + after 5000 -> + Reason = {?FUNCTION_NAME, Tag}, + flush(Reason), + ct:fail(Reason) + end. diff --git a/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl b/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl index f64ad8cbf4d4..d8738b1de580 100644 --- a/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl +++ b/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl @@ -979,7 +979,7 @@ connections_test_amqp(Config) -> auth_mechanism := <<"PLAIN">>, protocol := <<"AMQP 1-0">>, client_properties := #{version := _, - product := <<"AMQP 1.0 client from the RabbitMQ Project">>, + product := <<"AMQP 1.0 client">>, platform := _}}, Connection1), ConnectionName = maps:get(name, Connection1), diff --git a/moduleindex.yaml b/moduleindex.yaml index 19f69504b60d..d3a719ebcd77 100755 --- a/moduleindex.yaml +++ b/moduleindex.yaml @@ -544,6 +544,7 @@ rabbit: - rabbit_access_control - rabbit_alarm - rabbit_amqp1_0 +- rabbit_amqp_management - rabbit_amqp_reader - rabbit_amqp_session - rabbit_amqp_session_sup @@ -824,6 +825,8 @@ rabbit_common: - worker_pool - worker_pool_sup - worker_pool_worker +rabbitmq_amqp_client: +- rabbitmq_amqp_client rabbitmq_amqp1_0: - rabbitmq_amqp1_0_noop rabbitmq_auth_backend_cache: