diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index ffdcfd5837a6..3788f20afe3a 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -662,7 +662,11 @@ make_target(#{role := {receiver, _Source, _Pid}}) -> #'v1_0.target'{}; make_target(#{role := {sender, #{address := Address} = Target}}) -> Durable = translate_terminus_durability(maps:get(durable, Target, none)), - #'v1_0.target'{address = {utf8, Address}, + TargetAddr = case is_binary(Address) of + true -> {utf8, Address}; + false -> Address + end, + #'v1_0.target'{address = TargetAddr, durable = {uint, Durable}}. max_message_size(#{max_message_size := Size}) diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl index 62ab9c2a461d..cefc6a9be761 100644 --- a/deps/amqp10_client/test/system_SUITE.erl +++ b/deps/amqp10_client/test/system_SUITE.erl @@ -83,7 +83,8 @@ init_per_suite(Config) -> ]). end_per_suite(Config) -> - rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_helpers:run_teardown_steps( + Config, [ fun stop_amqp10_client_app/1 ]). diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 7d9eae16d5fc..fbfa8f5fbc25 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -1264,6 +1264,7 @@ rabbitmq_integration_suite( rabbitmq_integration_suite( name = "amqp_auth_SUITE", + shard_count = 2, additional_beam = [ ":test_event_recorder_beam", ], @@ -1272,6 +1273,14 @@ rabbitmq_integration_suite( ], ) +rabbitmq_integration_suite( + name = "amqp_address_SUITE", + shard_count = 2, + runtime_deps = [ + "//deps/rabbitmq_amqp_client:erlang_app", + ], +) + rabbitmq_integration_suite( name = "amqp_credit_api_v2_SUITE", runtime_deps = [ diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index 39151d7d0425..4fa698530c5d 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -2212,3 +2212,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"): erlc_opts = "//:test_erlc_opts", deps = ["//deps/rabbit_common:erlang_app"], ) + erlang_bytecode( + name = "amqp_address_SUITE_beam_files", + testonly = True, + srcs = ["test/amqp_address_SUITE.erl"], + outs = ["test/amqp_address_SUITE.beam"], + app_name = "rabbit", + erlc_opts = "//:test_erlc_opts", + deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbitmq_amqp_client:erlang_app"], + ) diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index 9e4f7f6fe359..1e34a4a49405 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -146,20 +146,17 @@ init(Proto, Data, Anns) -> init(Proto, Data, Anns, #{}). -spec init(protocol(), term(), annotations(), environment()) -> state(). -init(Proto, Data, Anns0, Env) - when is_atom(Proto) - andalso is_map(Anns0) - andalso is_map(Env) -> +init(Proto, Data, Anns0, Env) -> {ProtoData, ProtoAnns} = Proto:init(Data), - Anns = case maps:size(Env) == 0 of - true -> - Anns0; - false -> - Anns0#{env => Env} - end, + Anns1 = case map_size(Env) == 0 of + true -> Anns0; + false -> Anns0#{env => Env} + end, + Anns2 = maps:merge(ProtoAnns, Anns1), + Anns = set_received_at_timestamp(Anns2), #?MODULE{protocol = Proto, data = ProtoData, - annotations = set_received_at_timestamp(maps:merge(ProtoAnns, Anns))}. + annotations = Anns}. -spec size(state()) -> {MetadataSize :: non_neg_integer(), @@ -196,7 +193,7 @@ take_annotation(_Key, BasicMessage) -> -spec set_annotation(ann_key(), ann_value(), state()) -> state(). set_annotation(Key, Value, #?MODULE{annotations = Anns} = State) -> - State#?MODULE{annotations = maps:put(Key, Value, Anns)}; + State#?MODULE{annotations = Anns#{Key => Value}}; set_annotation(Key, Value, BasicMessage) -> mc_compat:set_annotation(Key, Value, BasicMessage). @@ -313,7 +310,7 @@ property(_Property, _BasicMsg) -> -spec set_ttl(undefined | non_neg_integer(), state()) -> state(). set_ttl(Value, #?MODULE{annotations = Anns} = State) -> - State#?MODULE{annotations = maps:put(ttl, Value, Anns)}; + State#?MODULE{annotations = Anns#{ttl => Value}}; set_ttl(Value, BasicMsg) -> mc_compat:set_ttl(Value, BasicMsg). diff --git a/deps/rabbit/src/mc_amqp.erl b/deps/rabbit/src/mc_amqp.erl index 0958ac2cf461..79107e93f8e1 100644 --- a/deps/rabbit/src/mc_amqp.erl +++ b/deps/rabbit/src/mc_amqp.erl @@ -117,6 +117,8 @@ property(user_id, #msg{properties = #'v1_0.properties'{user_id = UserId}}) -> UserId; property(subject, #msg{properties = #'v1_0.properties'{subject = Subject}}) -> Subject; +property(to, #msg{properties = #'v1_0.properties'{to = To}}) -> + To; property(_Prop, #msg{}) -> undefined. diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index f5f68c5a32bf..fbf1b9444340 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -1709,8 +1709,7 @@ persist_static_configuration() -> _ -> ?MAX_MSG_SIZE end, - ok = persistent_term:put(max_message_size, MaxMsgSize), - ok = rabbit_amqp_management:persist_static_configuration(). + ok = persistent_term:put(max_message_size, MaxMsgSize). persist_static_configuration(Params) -> App = ?MODULE, diff --git a/deps/rabbit/src/rabbit_amqp_management.erl b/deps/rabbit/src/rabbit_amqp_management.erl index 760ff6c41abb..23024c86511e 100644 --- a/deps/rabbit/src/rabbit_amqp_management.erl +++ b/deps/rabbit/src/rabbit_amqp_management.erl @@ -3,16 +3,24 @@ -include("rabbit_amqp.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). --export([persist_static_configuration/0, - handle_request/4]). +-export([handle_request/5]). --define(DEAD_LETTER_EXCHANGE_KEY, <<"x-dead-letter-exchange">>). --define(MP_BINDING_URI_PATH_SEGMENT, mp_binding_uri_path_segment). +-import(rabbit_amqp_session, + [check_resource_access/4, + check_read_permitted_on_topic/4]). + +-type permission_caches() :: {rabbit_amqp_session:permission_cache(), + rabbit_amqp_session:topic_permission_cache()}. --type resource_name() :: rabbit_types:exchange_name() | rabbit_types:rabbit_amqqueue_name(). +-define(DEAD_LETTER_EXCHANGE_KEY, <<"x-dead-letter-exchange">>). --spec handle_request(binary(), rabbit_types:vhost(), rabbit_types:user(), pid()) -> iolist(). -handle_request(Request, Vhost, User, ConnectionPid) -> +-spec handle_request(binary(), + rabbit_types:vhost(), + rabbit_types:user(), + pid(), + permission_caches()) -> + {iolist(), permission_caches()}. +handle_request(Request, Vhost, User, ConnectionPid, PermCaches0) -> ReqSections = amqp10_framing:decode_bin(Request), ?DEBUG("~s Inbound request:~n ~tp", [?MODULE, [amqp10_framing:pprint(Section) || Section <- ReqSections]]), @@ -28,19 +36,22 @@ handle_request(Request, Vhost, User, ConnectionPid) -> } = decode_req(ReqSections, {undefined, undefined}), {StatusCode, - RespBody} = try {PathSegments, QueryMap} = parse_uri(HttpRequestTarget), - handle_http_req(HttpMethod, - PathSegments, - QueryMap, - ReqBody, - Vhost, - User, - ConnectionPid) - catch throw:{?MODULE, StatusCode0, Explanation} -> - rabbit_log:warning("request ~ts ~ts failed: ~ts", - [HttpMethod, HttpRequestTarget, Explanation]), - {StatusCode0, {utf8, Explanation}} - end, + RespBody, + PermCaches + } = try {PathSegments, QueryMap} = parse_uri(HttpRequestTarget), + handle_http_req(HttpMethod, + PathSegments, + QueryMap, + ReqBody, + Vhost, + User, + ConnectionPid, + PermCaches0) + catch throw:{?MODULE, StatusCode0, Explanation} -> + rabbit_log:warning("request ~ts ~ts failed: ~ts", + [HttpMethod, HttpRequestTarget, Explanation]), + {StatusCode0, {utf8, Explanation}, PermCaches0} + end, RespProps = #'v1_0.properties'{ subject = {utf8, StatusCode}, @@ -54,7 +65,8 @@ handle_request(Request, Vhost, User, ConnectionPid) -> ]}, RespDataSect = #'v1_0.amqp_value'{content = RespBody}, RespSections = [RespProps, RespAppProps, RespDataSect], - [amqp10_framing:encode_bin(Sect) || Sect <- RespSections]. + IoList = [amqp10_framing:encode_bin(Sect) || Sect <- RespSections], + {IoList, PermCaches}. handle_http_req(<<"GET">>, [<<"queues">>, QNameBinQuoted], @@ -62,7 +74,8 @@ handle_http_req(<<"GET">>, null, Vhost, _User, - _ConnPid) -> + _ConnPid, + PermCaches) -> QNameBin = uri_string:unquote(QNameBinQuoted), QName = rabbit_misc:r(Vhost, queue, QNameBin), case rabbit_amqqueue:with( @@ -70,7 +83,7 @@ handle_http_req(<<"GET">>, fun(Q) -> {ok, NumMsgs, NumConsumers} = rabbit_amqqueue:stat(Q), RespPayload = encode_queue(Q, NumMsgs, NumConsumers), - {ok, {<<"200">>, RespPayload}} + {ok, {<<"200">>, RespPayload, PermCaches}} end) of {ok, Result} -> Result; @@ -86,7 +99,8 @@ handle_http_req(HttpMethod = <<"PUT">>, ReqPayload, Vhost, User = #user{username = Username}, - ConnPid) -> + ConnPid, + {PermCache0, TopicPermCache}) -> #{durable := Durable, auto_delete := AutoDelete, exclusive := Exclusive, @@ -106,10 +120,10 @@ handle_http_req(HttpMethod = <<"PUT">>, ok = prohibit_cr_lf(QNameBin), QName = rabbit_misc:r(Vhost, queue, QNameBin), ok = prohibit_reserved_amq(QName), - ok = check_resource_access(QName, configure, User), + PermCache1 = check_resource_access(QName, configure, User, PermCache0), rabbit_core_metrics:queue_declared(QName), - {Q1, NumMsgs, NumConsumers, StatusCode} = + {Q1, NumMsgs, NumConsumers, StatusCode, PermCache} = case rabbit_amqqueue:with( QName, fun(Q) -> @@ -117,7 +131,7 @@ handle_http_req(HttpMethod = <<"PUT">>, Q, Durable, AutoDelete, QArgs, Owner) of ok -> {ok, Msgs, Consumers} = rabbit_amqqueue:stat(Q), - {ok, {Q, Msgs, Consumers, <<"200">>}} + {ok, {Q, Msgs, Consumers, <<"200">>, PermCache1}} catch exit:#amqp_error{name = precondition_failed, explanation = Expl} -> throw(<<"409">>, Expl, []); @@ -129,23 +143,23 @@ handle_http_req(HttpMethod = <<"PUT">>, Result; {error, not_found} -> ok = check_vhost_queue_limit(QName), - ok = check_dead_letter_exchange(QName, QArgs, User), + PermCache2 = check_dead_letter_exchange(QName, QArgs, User, PermCache1), case rabbit_amqqueue:declare( QName, Durable, AutoDelete, QArgs, Owner, Username) of {new, Q} -> rabbit_core_metrics:queue_created(QName), - {Q, 0, 0, <<"201">>}; + {Q, 0, 0, <<"201">>, PermCache2}; {owner_died, Q} -> %% Presumably our own days are numbered since the %% connection has died. Pretend the queue exists though, %% just so nothing fails. - {Q, 0, 0, <<"201">>}; + {Q, 0, 0, <<"201">>, PermCache2}; {absent, Q, Reason} -> absent(Q, Reason); {existing, _Q} -> %% Must have been created in the meantime. Loop around again. - handle_http_req(HttpMethod, PathSegments, Query, - ReqPayload, Vhost, User, ConnPid); + handle_http_req(HttpMethod, PathSegments, Query, ReqPayload, + Vhost, User, ConnPid, {PermCache2, TopicPermCache}); {protocol_error, _ErrorType, Reason, ReasonArgs} -> throw(<<"400">>, Reason, ReasonArgs) end; @@ -154,7 +168,7 @@ handle_http_req(HttpMethod = <<"PUT">>, end, RespPayload = encode_queue(Q1, NumMsgs, NumConsumers), - {StatusCode, RespPayload}; + {StatusCode, RespPayload, {PermCache, TopicPermCache}}; handle_http_req(<<"PUT">>, [<<"exchanges">>, XNameBinQuoted], @@ -162,7 +176,8 @@ handle_http_req(<<"PUT">>, ReqPayload, Vhost, User = #user{username = Username}, - _ConnPid) -> + _ConnPid, + {PermCache0, TopicPermCache}) -> XNameBin = uri_string:unquote(XNameBinQuoted), #{type := XTypeBin, durable := Durable, @@ -176,7 +191,7 @@ handle_http_req(<<"PUT">>, end, XName = rabbit_misc:r(Vhost, exchange, XNameBin), ok = prohibit_default_exchange(XName), - ok = check_resource_access(XName, configure, User), + PermCache = check_resource_access(XName, configure, User, PermCache0), X = case rabbit_exchange:lookup(XName) of {ok, FoundX} -> FoundX; @@ -190,7 +205,7 @@ handle_http_req(<<"PUT">>, try rabbit_exchange:assert_equivalence( X, XTypeAtom, Durable, AutoDelete, Internal, XArgs) of ok -> - {<<"204">>, null} + {<<"204">>, null, {PermCache, TopicPermCache}} catch exit:#amqp_error{name = precondition_failed, explanation = Expl} -> throw(<<"409">>, Expl, []) @@ -202,17 +217,18 @@ handle_http_req(<<"DELETE">>, null, Vhost, User, - ConnPid) -> + ConnPid, + {PermCache0, TopicPermCache}) -> QNameBin = uri_string:unquote(QNameBinQuoted), QName = rabbit_misc:r(Vhost, queue, QNameBin), - ok = check_resource_access(QName, read, User), + PermCache = check_resource_access(QName, read, User, PermCache0), try rabbit_amqqueue:with_exclusive_access_or_die( QName, ConnPid, fun (Q) -> case rabbit_queue_type:purge(Q) of {ok, NumMsgs} -> RespPayload = purge_or_delete_queue_response(NumMsgs), - {<<"200">>, RespPayload}; + {<<"200">>, RespPayload, {PermCache, TopicPermCache}}; {error, not_supported} -> throw(<<"400">>, "purge not supported by ~ts", @@ -229,15 +245,16 @@ handle_http_req(<<"DELETE">>, null, Vhost, User = #user{username = Username}, - ConnPid) -> + ConnPid, + {PermCache0, TopicPermCache}) -> QNameBin = uri_string:unquote(QNameBinQuoted), QName = rabbit_misc:r(Vhost, queue, QNameBin), ok = prohibit_cr_lf(QNameBin), - ok = check_resource_access(QName, configure, User), + PermCache = check_resource_access(QName, configure, User, PermCache0), try rabbit_amqqueue:delete_with(QName, ConnPid, false, false, Username, true) of {ok, NumMsgs} -> RespPayload = purge_or_delete_queue_response(NumMsgs), - {<<"200">>, RespPayload} + {<<"200">>, RespPayload, {PermCache, TopicPermCache}} catch exit:#amqp_error{explanation = Explanation} -> throw(<<"400">>, Explanation, []) end; @@ -248,15 +265,16 @@ handle_http_req(<<"DELETE">>, null, Vhost, User = #user{username = Username}, - _ConnPid) -> + _ConnPid, + {PermCache0, TopicPermCache}) -> XNameBin = uri_string:unquote(XNameBinQuoted), XName = rabbit_misc:r(Vhost, exchange, XNameBin), ok = prohibit_cr_lf(XNameBin), ok = prohibit_default_exchange(XName), ok = prohibit_reserved_amq(XName), - ok = check_resource_access(XName, configure, User), + PermCache = check_resource_access(XName, configure, User, PermCache0), _ = rabbit_exchange:delete(XName, false, Username), - {<<"204">>, null}; + {<<"204">>, null, {PermCache, TopicPermCache}}; handle_http_req(<<"POST">>, [<<"bindings">>], @@ -264,7 +282,8 @@ handle_http_req(<<"POST">>, ReqPayload, Vhost, User = #user{username = Username}, - ConnPid) -> + ConnPid, + PermCaches0) -> #{source := SrcXNameBin, binding_key := BindingKey, arguments := Args} = BindingMap = decode_binding(ReqPayload), @@ -276,13 +295,13 @@ handle_http_req(<<"POST">>, end, SrcXName = rabbit_misc:r(Vhost, exchange, SrcXNameBin), DstName = rabbit_misc:r(Vhost, DstKind, DstNameBin), - ok = binding_checks(SrcXName, DstName, BindingKey, User), + PermCaches = binding_checks(SrcXName, DstName, BindingKey, User, PermCaches0), Binding = #binding{source = SrcXName, destination = DstName, key = BindingKey, args = Args}, ok = binding_action(add, Binding, Username, ConnPid), - {<<"204">>, null}; + {<<"204">>, null, PermCaches}; handle_http_req(<<"DELETE">>, [<<"bindings">>, BindingSegment], @@ -290,7 +309,8 @@ handle_http_req(<<"DELETE">>, null, Vhost, User = #user{username = Username}, - ConnPid) -> + ConnPid, + PermCaches0) -> {SrcXNameBin, DstKind, DstNameBin, @@ -298,7 +318,7 @@ handle_http_req(<<"DELETE">>, ArgsHash} = decode_binding_path_segment(BindingSegment), SrcXName = rabbit_misc:r(Vhost, exchange, SrcXNameBin), DstName = rabbit_misc:r(Vhost, DstKind, DstNameBin), - ok = binding_checks(SrcXName, DstName, BindingKey, User), + PermCaches = binding_checks(SrcXName, DstName, BindingKey, User, PermCaches0), Bindings = rabbit_binding:list_for_source_and_destination(SrcXName, DstName), case search_binding(BindingKey, ArgsHash, Bindings) of {value, Binding} -> @@ -306,7 +326,7 @@ handle_http_req(<<"DELETE">>, false -> ok end, - {<<"204">>, null}; + {<<"204">>, null, PermCaches}; handle_http_req(<<"GET">>, [<<"bindings">>], @@ -315,7 +335,8 @@ handle_http_req(<<"GET">>, null, Vhost, _User, - _ConnPid) -> + _ConnPid, + PermCaches) -> {DstKind, DstNameBin} = case QueryMap of #{<<"dste">> := DstX} -> @@ -332,7 +353,7 @@ handle_http_req(<<"GET">>, Bindings0 = rabbit_binding:list_for_source_and_destination(SrcXName, DstName), Bindings = [B || B = #binding{key = K} <- Bindings0, K =:= Key], RespPayload = encode_bindings(Bindings), - {<<"200">>, RespPayload}. + {<<"200">>, RespPayload, PermCaches}. decode_queue({map, KVList}) -> M = lists:foldl( @@ -549,19 +570,21 @@ compose_binding_uri(Src, DstKind, Dst, Key, Args) -> ";key=", KeyQ/binary, ";args=", ArgsHash/binary>>. --spec persist_static_configuration() -> ok. -persist_static_configuration() -> - %% This regex matches for example binding: - %% src=e1;dstq=q2;key=my-key;args= - %% Source, destination, and binding key values must be percent encoded. - %% Binding args use the URL safe Base 64 Alphabet: https://datatracker.ietf.org/doc/html/rfc4648#section-5 - {ok, MP} = re:compile( - <<"^src=([0-9A-Za-z\-.\_\~%]+);dst([eq])=([0-9A-Za-z\-.\_\~%]+);", - "key=([0-9A-Za-z\-.\_\~%]*);args=([0-9A-Za-z\-\_]*)$">>), - ok = persistent_term:put(?MP_BINDING_URI_PATH_SEGMENT, MP). - decode_binding_path_segment(Segment) -> - MP = persistent_term:get(?MP_BINDING_URI_PATH_SEGMENT), + PersistentTermKey = mp_binding_uri_path_segment, + MP = try persistent_term:get(PersistentTermKey) + catch error:badarg -> + %% This regex matches for example binding: + %% src=e1;dstq=q2;key=my-key;args= + %% Source, destination, and binding key values must be percent encoded. + %% Binding args use the URL safe Base 64 Alphabet: + %% https://datatracker.ietf.org/doc/html/rfc4648#section-5 + {ok, MP0} = re:compile( + <<"^src=([0-9A-Za-z\-.\_\~%]+);dst([eq])=([0-9A-Za-z\-.\_\~%]+);", + "key=([0-9A-Za-z\-.\_\~%]*);args=([0-9A-Za-z\-\_]*)$">>), + ok = persistent_term:put(PersistentTermKey, MP0), + MP0 + end, case re:run(Segment, MP, [{capture, all_but_first, binary}]) of {match, [SrcQ, <>, DstQ, KeyQ, ArgsHash]} -> Src = uri_string:unquote(SrcQ), @@ -599,22 +622,26 @@ args_hash(Args) padding => false}). -spec binding_checks(rabbit_types:exchange_name(), - resource_name(), + rabbit_types:r(exchange | queue), rabbit_types:binding_key(), - rabbit_types:user()) -> ok. -binding_checks(SrcXName, DstName, BindingKey, User) -> + rabbit_types:user(), + permission_caches()) -> + permission_caches(). +binding_checks(SrcXName, DstName, BindingKey, User, {PermCache0, TopicPermCache0}) -> lists:foreach(fun(#resource{name = NameBin} = Name) -> ok = prohibit_default_exchange(Name), ok = prohibit_cr_lf(NameBin) end, [SrcXName, DstName]), - ok = check_resource_access(DstName, write, User), - ok = check_resource_access(SrcXName, read, User), - case rabbit_exchange:lookup(SrcXName) of - {ok, SrcX} -> - rabbit_amqp_session:check_read_permitted_on_topic(SrcX, User, BindingKey); - {error, not_found} -> - ok - end. + PermCache1 = check_resource_access(DstName, write, User, PermCache0), + PermCache = check_resource_access(SrcXName, read, User, PermCache1), + TopicPermCache = case rabbit_exchange:lookup(SrcXName) of + {ok, SrcX} -> + check_read_permitted_on_topic( + SrcX, User, BindingKey, TopicPermCache0); + {error, not_found} -> + TopicPermCache0 + end, + {PermCache, TopicPermCache}. binding_action(Action, Binding, Username, ConnPid) -> try rabbit_channel:binding_action(Action, Binding, Username, ConnPid) @@ -641,7 +668,7 @@ prohibit_default_exchange(#resource{kind = exchange, prohibit_default_exchange(_) -> ok. --spec prohibit_reserved_amq(resource_name()) -> ok. +-spec prohibit_reserved_amq(rabbit_types:r(exchange | queue)) -> ok. prohibit_reserved_amq(Res = #resource{name = <<"amq.", _/binary>>}) -> throw(<<"403">>, "~ts starts with reserved prefix 'amq.'", @@ -649,19 +676,6 @@ prohibit_reserved_amq(Res = #resource{name = <<"amq.", _/binary>>}) -> prohibit_reserved_amq(#resource{}) -> ok. --spec check_resource_access(resource_name(), - rabbit_types:permission_atom(), - rabbit_types:user()) -> ok. -check_resource_access(Resource, Perm, User) -> - try rabbit_access_control:check_resource_access(User, Resource, Perm, #{}) - catch exit:#amqp_error{name = access_refused, - explanation = Explanation} -> - %% For authorization failures, let's be more strict: Close the entire - %% AMQP session instead of only returning an HTTP Status Code 403. - rabbit_amqp_util:protocol_error( - ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, Explanation, []) - end. - check_vhost_queue_limit(QName = #resource{virtual_host = Vhost}) -> case rabbit_vhost_limit:is_over_queue_limit(Vhost) of false -> @@ -673,17 +687,17 @@ check_vhost_queue_limit(QName = #resource{virtual_host = Vhost}) -> end. -check_dead_letter_exchange(QName = #resource{virtual_host = Vhost}, QArgs, User) -> +check_dead_letter_exchange(QName = #resource{virtual_host = Vhost}, QArgs, User, PermCache0) -> case rabbit_misc:r_arg(Vhost, exchange, QArgs, ?DEAD_LETTER_EXCHANGE_KEY) of undefined -> - ok; + PermCache0; {error, {invalid_type, Type}} -> throw(<<"400">>, "invalid type '~ts' for arg '~s'", [Type, ?DEAD_LETTER_EXCHANGE_KEY]); DLX -> - ok = check_resource_access(QName, read, User), - ok = check_resource_access(DLX, write, User) + PermCache = check_resource_access(QName, read, User, PermCache0), + check_resource_access(DLX, write, User, PermCache) end. -spec absent(amqqueue:amqqueue(), diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index 2d3190300eb2..9df3376d06fb 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -485,15 +485,19 @@ handle_1_0_connection_frame( Infos), ok = rabbit_event:notify(connection_created, Infos), ok = rabbit_amqp1_0:register_connection(self()), - ok = send_on_channel0( - Sock, - #'v1_0.open'{channel_max = ClientChannelMax, - max_frame_size = {uint, IncomingMaxFrameSize}, - %% "the value in idle-time-out SHOULD be half the peer's actual timeout threshold" [2.4.5] - idle_time_out = {uint, ReceiveTimeoutMillis div 2}, - container_id = {utf8, rabbit_nodes:cluster_name()}, - offered_capabilities = {array, symbol, [{symbol, <<"LINK_PAIR_V1_0">>}]}, - properties = server_properties()}), + Caps = [%% https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html#_Toc51331306 + {symbol, <<"LINK_PAIR_V1_0">>}, + %% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-anonymous-relay + {symbol, <<"ANONYMOUS-RELAY">>}], + Open = #'v1_0.open'{ + channel_max = ClientChannelMax, + max_frame_size = {uint, IncomingMaxFrameSize}, + %% "the value in idle-time-out SHOULD be half the peer's actual timeout threshold" [2.4.5] + idle_time_out = {uint, ReceiveTimeoutMillis div 2}, + container_id = {utf8, rabbit_nodes:cluster_name()}, + offered_capabilities = {array, symbol, Caps}, + properties = server_properties()}, + ok = send_on_channel0(Sock, Open), State; handle_1_0_connection_frame(#'v1_0.close'{}, State0) -> State = State0#v1{connection_state = closing}, @@ -881,13 +885,14 @@ check_user_connection_limit(Username) -> %% https://datatracker.ietf.org/doc/html/rfc4422#section-3.8 , or %% 2. Claims Based Security (CBS) extension, see https://docs.oasis-open.org/amqp/amqp-cbs/v1.0/csd01/amqp-cbs-v1.0-csd01.html %% and https://github.com/rabbitmq/rabbitmq-server/issues/9259 +%% 3. Simpler variation of 2. where a token is put to a special /token node. %% %% If the user does not refresh their credential on time (the only implementation currently), %% close the entire connection as we must assume that vhost access could have been revoked. %% %% If the user refreshes their credential on time (to be implemented), the AMQP reader should %% 1. rabbit_access_control:check_vhost_access/4 -%% 2. send a message to all its sessions which should then erase the topic permission cache and +%% 2. send a message to all its sessions which should then erase the permission caches and %% re-check all link permissions (i.e. whether reading / writing to exchanges / queues is still allowed). %% 3. cancel the current timer, and set a new timer %% similary as done for Stream connections, see https://github.com/rabbitmq/rabbitmq-server/issues/10292 diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 2aaef011c261..ebc77397a6ce 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -16,6 +16,15 @@ -include("rabbit_amqp.hrl"). -include("mc.hrl"). +-rabbit_deprecated_feature( + {amqp_address_v1, + #{deprecation_phase => permitted_by_default, + messages => + #{when_permitted => + "RabbitMQ AMQP address version 1 is deprecated. " + "Clients should use RabbitMQ AMQP address version 2."}} + }). + -define(PROTOCOL, amqp10). -define(HIBERNATE_AFTER, 6_000). -define(CREDIT_REPLY_TIMEOUT, 30_000). @@ -34,7 +43,6 @@ ?V_1_0_SYMBOL_RELEASED, ?V_1_0_SYMBOL_MODIFIED]). -define(MAX_PERMISSION_CACHE_SIZE, 12). --define(TOPIC_PERMISSION_CACHE, topic_permission_cache). -define(PROCESS_GROUP_NAME, amqp_sessions). -define(UINT(N), {uint, N}). %% This is the link credit that we grant to sending clients. @@ -50,12 +58,14 @@ -define(LINK_CREDIT_RCV, 128). -define(MANAGEMENT_LINK_CREDIT_RCV, 8). -define(MANAGEMENT_NODE_ADDRESS, <<"/management">>). +-define(DEFAULT_EXCHANGE_NAME, <<>>). -export([start_link/8, process_frame/2, list_local/0, conserve_resources/3, - check_read_permitted_on_topic/3 + check_resource_access/4, + check_read_permitted_on_topic/4 ]). -export([init/1, @@ -72,6 +82,15 @@ diff/2, compare/2]). +-type permission_cache() :: [{rabbit_types:r(exchange | queue), + rabbit_types:permission_atom()}]. +-type topic_permission_cache() :: [{rabbit_types:r(topic), + rabbit_types:routing_key(), + rabbit_types:permission_atom()}]. + +-export_type([permission_cache/0, + topic_permission_cache/0]). + %% incoming multi transfer delivery [2.6.14] -record(multi_transfer_msg, { payload_fragments_rev :: [binary(),...], @@ -96,8 +115,14 @@ }). -record(incoming_link, { - exchange :: rabbit_types:exchange() | rabbit_exchange:name(), - routing_key :: undefined | rabbit_types:routing_key(), + %% The exchange is either defined in the ATTACH frame and static for + %% the life time of the link or dynamically provided in each message's + %% "to" field (address v2). + exchange :: rabbit_types:exchange() | rabbit_exchange:name() | to, + %% The routing key is either defined in the ATTACH frame and static for + %% the life time of the link or dynamically provided in each message's + %% "to" field (address v2) or "subject" field (address v1). + routing_key :: rabbit_types:routing_key() | to | subject, %% queue_name_bin is only set if the link target address refers to a queue. queue_name_bin :: undefined | rabbit_misc:resource_name(), delivery_count :: sequence_no(), @@ -256,7 +281,9 @@ %% Queues that got deleted. stashed_eol = [] :: [rabbit_amqqueue:name()], - queue_states = rabbit_queue_type:init() :: rabbit_queue_type:state() + queue_states = rabbit_queue_type:init() :: rabbit_queue_type:state(), + permission_cache = [] :: permission_cache(), + topic_permission_cache = [] :: topic_permission_cache() }). -type state() :: #state{}. @@ -703,13 +730,15 @@ destroy_outgoing_link(Handle, Link = #outgoing_link{queue_name_bin = QNameBin}, destroy_outgoing_link(_, _, _, Acc) -> Acc. -detach(Handle, Link, ErrorCondition) -> - rabbit_log:warning("Detaching link handle ~b due to error condition: ~tp", - [Handle, ErrorCondition]), +detach(Handle, Link, Error = #'v1_0.error'{}) -> + rabbit_log:warning("Detaching link handle ~b due to error: ~tp", + [Handle, Error]), publisher_or_consumer_deleted(Link), #'v1_0.detach'{handle = ?UINT(Handle), closed = true, - error = #'v1_0.error'{condition = ErrorCondition}}. + error = Error}; +detach(Handle, Link, ErrorCondition) -> + detach(Handle, Link, #'v1_0.error'{condition = ErrorCondition}). send_dispositions(Ids, DeliveryState, Writer, ChannelNum) -> Ranges = serial_number:ranges(Ids), @@ -857,11 +886,12 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER, initial_delivery_count = DeliveryCount = ?UINT(DeliveryCountInt) } = Attach, State0 = #state{incoming_links = IncomingLinks0, + permission_cache = PermCache0, cfg = #cfg{vhost = Vhost, user = User}}) -> ok = validate_attach(Attach), - case ensure_target(Target, Vhost, User) of - {ok, Exchange, RoutingKey, QNameBin} -> + case ensure_target(Target, Vhost, User, PermCache0) of + {ok, Exchange, RoutingKey, QNameBin, PermCache} -> IncomingLink = #incoming_link{ exchange = Exchange, routing_key = RoutingKey, @@ -887,11 +917,11 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER, %% using a handle which is already associated with a link MUST be responded to %% with an immediate close carrying a handle-in-use session-error." IncomingLinks = IncomingLinks0#{HandleInt => IncomingLink}, - State = State0#state{incoming_links = IncomingLinks}, + State = State0#state{incoming_links = IncomingLinks, + permission_cache = PermCache}, rabbit_global_counters:publisher_created(?PROTOCOL), reply0([Reply, Flow], State); {error, Reason} -> - %% TODO proper link establishment protocol here? protocol_error(?V_1_0_AMQP_ERROR_INVALID_FIELD, "Attach rejected: ~tp", [Reason]) @@ -906,25 +936,27 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, max_message_size = MaybeMaxMessageSize} = Attach, State0 = #state{queue_states = QStates0, outgoing_links = OutgoingLinks0, + permission_cache = PermCache0, + topic_permission_cache = TopicPermCache0, cfg = #cfg{vhost = Vhost, user = User = #user{username = Username}, reader_pid = ReaderPid}}) -> ok = validate_attach(Attach), - {SndSettled, EffectiveSndSettleMode} = case SndSettleMode of - ?V_1_0_SENDER_SETTLE_MODE_SETTLED -> - {true, SndSettleMode}; - _ -> - %% In the future, we might want to support sender settle - %% mode mixed where we would expect a settlement from the - %% client only for durable messages. - {false, ?V_1_0_SENDER_SETTLE_MODE_UNSETTLED} - end, - case ensure_source(Source, Vhost, User) of + {SndSettled, + EffectiveSndSettleMode} = case SndSettleMode of + ?V_1_0_SENDER_SETTLE_MODE_SETTLED -> + {true, SndSettleMode}; + _ -> + %% In the future, we might want to support sender settle + %% mode mixed where we would expect a settlement from the + %% client only for durable messages. + {false, ?V_1_0_SENDER_SETTLE_MODE_UNSETTLED} + end, + case ensure_source(Source, Vhost, User, PermCache0, TopicPermCache0) of {error, Reason} -> protocol_error(?V_1_0_AMQP_ERROR_INVALID_FIELD, "Attach rejected: ~tp", [Reason]); - {ok, QNameBin} -> - QName = rabbit_misc:r(Vhost, queue, QNameBin), - check_read_permitted(QName, User), + {ok, QName = #resource{name = QNameBin}, PermCache1, TopicPermCache} -> + PermCache = check_resource_access(QName, read, User, PermCache1), case rabbit_amqqueue:with( QName, fun(Q) -> @@ -990,7 +1022,9 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, delivery_count = DeliveryCount}, OutgoingLinks = OutgoingLinks0#{HandleInt => Link}, State1 = State0#state{queue_states = QStates, - outgoing_links = OutgoingLinks}, + outgoing_links = OutgoingLinks, + permission_cache = PermCache, + topic_permission_cache = TopicPermCache}, rabbit_global_counters:consumer_created(?PROTOCOL), {ok, [A], State1}; {error, Reason} -> @@ -1669,6 +1703,8 @@ incoming_mgmt_link_transfer( outgoing_management_links = OutgoingLinks, outgoing_pending = Pending, outgoing_delivery_id = OutgoingDeliveryId, + permission_cache = PermCache0, + topic_permission_cache = TopicPermCache0, cfg = #cfg{outgoing_max_frame_size = MaxFrameSize, vhost = Vhost, user = User, @@ -1723,7 +1759,9 @@ incoming_mgmt_link_transfer( [OutgoingCredit]) end, validate_message_size(Request, IncomingMaxMessageSize), - Response = rabbit_amqp_management:handle_request(Request, Vhost, User, ReaderPid), + {Response, + {PermCache, TopicPermCache}} = rabbit_amqp_management:handle_request( + Request, Vhost, User, ReaderPid, {PermCache0, TopicPermCache0}), Transfer = #'v1_0.transfer'{ handle = ?UINT(OutgoingHandleInt), @@ -1749,7 +1787,9 @@ incoming_mgmt_link_transfer( outgoing_delivery_id = add(OutgoingDeliveryId, 1), outgoing_pending = queue:in(PendingTransfer, Pending), incoming_management_links = maps:update(IncomingHandleInt, IncomingLink, IncomingLinks), - outgoing_management_links = maps:update(OutgoingHandleInt, OutgoingLink, OutgoingLinks)}, + outgoing_management_links = maps:update(OutgoingHandleInt, OutgoingLink, OutgoingLinks), + permission_cache = PermCache, + topic_permission_cache = TopicPermCache}, {Reply, State}. incoming_link_transfer( @@ -1802,7 +1842,7 @@ incoming_link_transfer( rcv_settle_mode = RcvSettleMode, handle = Handle = ?UINT(HandleInt)}, MsgPart, - #incoming_link{exchange = Exchange, + #incoming_link{exchange = LinkExchange, routing_key = LinkRKey, delivery_count = DeliveryCount0, incoming_unconfirmed_map = U0, @@ -1810,7 +1850,10 @@ incoming_link_transfer( multi_transfer_msg = MultiTransfer } = Link0, State0 = #state{queue_states = QStates0, + permission_cache = PermCache0, + topic_permission_cache = TopicPermCache0, cfg = #cfg{user = User = #user{username = Username}, + vhost = Vhost, trace_state = Trace, conn_name = ConnName, channel_num = ChannelNum}}) -> @@ -1834,15 +1877,14 @@ incoming_link_transfer( Sections = amqp10_framing:decode_bin(MsgBin), ?DEBUG("~s Inbound content:~n ~tp", [?MODULE, [amqp10_framing:pprint(Section) || Section <- Sections]]), - case rabbit_exchange_lookup(Exchange) of - {ok, X = #exchange{name = #resource{name = XNameBin}}} -> - Anns = #{?ANN_EXCHANGE => XNameBin}, - Mc0 = mc:init(mc_amqp, Sections, Anns), - {RoutingKey, Mc1} = ensure_routing_key(LinkRKey, Mc0), + Mc0 = mc:init(mc_amqp, Sections, #{}), + case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of + {ok, X, RoutingKey, Mc1, PermCache} -> Mc = rabbit_message_interceptor:intercept(Mc1), check_user_id(Mc, User), + TopicPermCache = check_write_permitted_on_topic( + X, User, RoutingKey, TopicPermCache0), messages_received(Settled), - check_write_permitted_on_topic(X, User, RoutingKey), QNames = rabbit_exchange:route(X, Mc, #{return_binding_keys => true}), rabbit_trace:tap_in(Mc, QNames, ConnName, ChannelNum, Username, Trace), Opts = #{correlation => {HandleInt, DeliveryId}}, @@ -1850,7 +1892,9 @@ incoming_link_transfer( Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0), case rabbit_queue_type:deliver(Qs, Mc, Opts, QStates0) of {ok, QStates, Actions} -> - State1 = State0#state{queue_states = QStates}, + State1 = State0#state{queue_states = QStates, + permission_cache = PermCache, + topic_permission_cache = TopicPermCache}, %% Confirms must be registered before processing actions %% because actions may contain rejections of publishes. {U, Reply0} = process_routing_confirm( @@ -1873,35 +1917,68 @@ incoming_link_transfer( "delivery_tag=~p, delivery_id=~p, reason=~p", [DeliveryTag, DeliveryId, Reason]) end; - {error, not_found} -> + {error, not_found, XName} -> Disposition = released(DeliveryId), - Detach = detach(HandleInt, Link0, ?V_1_0_AMQP_ERROR_RESOURCE_DELETED), + Description = unicode:characters_to_binary("no " ++ rabbit_misc:rs(XName)), + Err = #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_NOT_FOUND, + description = {utf8, Description}}, + Detach = detach(HandleInt, Link0, Err), {error, [Disposition, Detach]} end. -ensure_routing_key(LinkRKey, Mc0) -> - RKey = case LinkRKey of - undefined -> +lookup_target(#exchange{} = X, LinkRKey, Mc, _, _, PermCache) -> + lookup_routing_key(X, LinkRKey, Mc, PermCache); +lookup_target(#resource{} = XName, LinkRKey, Mc, _, _, PermCache) -> + case rabbit_exchange:lookup(XName) of + {ok, X} -> + lookup_routing_key(X, LinkRKey, Mc, PermCache); + {error, not_found} -> + {error, not_found, XName} + end; +lookup_target(to, to, Mc, Vhost, User, PermCache0) -> + case mc:property(to, Mc) of + {utf8, String} -> + case parse_target_v2_string(String) of + {ok, XNameBin, RKey, _} -> + XName = rabbit_misc:r(Vhost, exchange, XNameBin), + PermCache = check_resource_access(XName, write, User, PermCache0), + case rabbit_exchange:lookup(XName) of + {ok, X} -> + check_internal_exchange(X), + lookup_routing_key(X, RKey, Mc, PermCache); + {error, not_found} -> + {error, not_found, XName} + end; + {error, bad_address} -> + protocol_error( + ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED, + "bad 'to' address string: ~ts", + [String]) + end; + undefined -> + protocol_error( + ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED, + "anonymous terminus requires 'to' address to be set", + []) + end. + +lookup_routing_key(X = #exchange{name = #resource{name = XNameBin}}, + RKey0, Mc0, PermCache) -> + RKey = case RKey0 of + subject -> case mc:property(subject, Mc0) of - undefined -> - %% Set the default routing key of AMQP 0.9.1 'basic.publish'{}. - %% For example, when the client attached to target /exchange/amq.fanout and sends a - %% message without setting a 'subject' in the message properties, the routing key is - %% ignored during routing, but receiving code paths still expect some routing key to be set. - <<"">>; {utf8, Subject} -> - Subject + Subject; + undefined -> + <<>> end; - _ -> - LinkRKey + _ when is_binary(RKey0) -> + RKey0 end, - Mc = mc:set_annotation(?ANN_ROUTING_KEYS, [RKey], Mc0), - {RKey, Mc}. - -rabbit_exchange_lookup(X = #exchange{}) -> - {ok, X}; -rabbit_exchange_lookup(XName = #resource{}) -> - rabbit_exchange:lookup(XName). + Mc1 = mc:set_annotation(?ANN_EXCHANGE, XNameBin, Mc0), + Mc = mc:set_annotation(?ANN_ROUTING_KEYS, [RKey], Mc1), + {ok, X, RKey, Mc, PermCache}. process_routing_confirm([], _SenderSettles = true, _, U) -> rabbit_global_counters:messages_unroutable_dropped(?PROTOCOL, 1), @@ -1957,42 +2034,208 @@ maybe_grant_mgmt_link_credit(Credit, DeliveryCount, Handle) maybe_grant_mgmt_link_credit(Credit, _, _) -> {Credit, []}. -%% TODO default-outcome and outcomes, dynamic lifetimes -ensure_target(#'v1_0.target'{dynamic = true}, _, _) -> - protocol_error(?V_1_0_AMQP_ERROR_NOT_IMPLEMENTED, - "Dynamic targets not supported", []); -ensure_target(#'v1_0.target'{address = Address, - durable = Durable}, Vhost, User) -> +-spec ensure_source(#'v1_0.source'{}, + rabbit_types:vhost(), + rabbit_types:user(), + permission_cache(), + topic_permission_cache()) -> + {ok, rabbit_amqqueue:name(), permission_cache(), topic_permission_cache()} | + {error, term()}. +ensure_source(#'v1_0.source'{dynamic = true}, _, _, _, _) -> + not_implemented("Dynamic sources not supported"); +ensure_source(#'v1_0.source'{address = Address, + durable = Durable}, + Vhost, User, PermCache, TopicPermCache) -> case Address of - {utf8, Destination} -> - case rabbit_routing_parser:parse_endpoint(Destination, true) of - {ok, Dest} -> - QNameBin = ensure_terminus(target, Dest, Vhost, User, Durable), - {XNameList1, RK} = rabbit_routing_parser:parse_routing(Dest), - XNameBin = unicode:characters_to_binary(XNameList1), + {utf8, SourceAddr} -> + case address_v1_permitted() of + true -> ensure_source_v1( + SourceAddr, Vhost, User, Durable, PermCache, TopicPermCache); + false -> ensure_source_v2( + SourceAddr, Vhost, PermCache, TopicPermCache) + end; + _ -> + {error, {bad_address, Address}} + end. + +ensure_source_v1(Address, + Vhost, + User = #user{username = Username}, + Durable, + PermCache0, + TopicPermCache0) -> + case rabbit_routing_parser:parse_endpoint(Address, false) of + {ok, Src} -> + {QNameBin, PermCache1} = ensure_terminus(source, Src, Vhost, User, Durable, PermCache0), + case rabbit_routing_parser:parse_routing(Src) of + {"", QNameList} -> + true = string:equal(QNameList, QNameBin), + QName = rabbit_misc:r(Vhost, queue, QNameBin), + {ok, QName, PermCache1, TopicPermCache0}; + {XNameList, RoutingKeyList} -> + RoutingKey = unicode:characters_to_binary(RoutingKeyList), + XNameBin = unicode:characters_to_binary(XNameList), XName = rabbit_misc:r(Vhost, exchange, XNameBin), + QName = rabbit_misc:r(Vhost, queue, QNameBin), + Binding = #binding{source = XName, + destination = QName, + key = RoutingKey}, + PermCache2 = check_resource_access(QName, write, User, PermCache1), + PermCache = check_resource_access(XName, read, User, PermCache2), {ok, X} = rabbit_exchange:lookup(XName), - check_internal_exchange(X), - check_write_permitted(XName, User), - %% Pre-declared exchanges are protected against deletion and modification. - %% Let's cache the whole #exchange{} record to save a - %% rabbit_exchange:lookup(XName) call each time we receive a message. - Exchange = case XNameBin of - <<>> -> X; - <<"amq.", _/binary>> -> X; - _ -> XName - end, - RoutingKey = case RK of - undefined -> undefined; - [] -> undefined; - _ -> unicode:characters_to_binary(RK) - end, - {ok, Exchange, RoutingKey, QNameBin}; - {error, _} = E -> - E + TopicPermCache = check_read_permitted_on_topic( + X, User, RoutingKey, TopicPermCache0), + case rabbit_binding:add(Binding, Username) of + ok -> + {ok, QName, PermCache, TopicPermCache}; + {error, _} = Err -> + Err + end end; - _Else -> - {error, {address_not_utf8_string, Address}} + {error, _} -> + ensure_source_v2(Address, Vhost, PermCache0, TopicPermCache0) + end. + +%% The only possible v2 source address format is: +%% /queue/:queue +ensure_source_v2(<<"/queue/", QNameBin/binary>>, Vhost, PermCache, TopicPermCache) -> + QName = rabbit_misc:r(Vhost, queue, QNameBin), + ok = exit_if_absent(QName), + {ok, QName, PermCache, TopicPermCache}; +ensure_source_v2(Address, _, _, _) -> + {error, {bad_address, Address}}. + +-spec ensure_target(#'v1_0.target'{}, + rabbit_types:vhost(), + rabbit_types:user(), + permission_cache()) -> + {ok, + rabbit_types:exchange() | rabbit_exchange:name() | to, + rabbit_types:routing_key() | to | subject, + rabbit_misc:resource_name() | undefined, + permission_cache()} | + {error, term()}. +ensure_target(#'v1_0.target'{dynamic = true}, _, _, _) -> + not_implemented("Dynamic targets not supported"); +ensure_target(#'v1_0.target'{address = Address, + durable = Durable}, + Vhost, User, PermCache) -> + case address_v1_permitted() of + true -> + try_target_v1(Address, Vhost, User, Durable, PermCache); + false -> + try_target_v2(Address, Vhost, User, PermCache) + end. + +try_target_v1(Address, Vhost, User, Durable, PermCache0) -> + case ensure_target_v1(Address, Vhost, User, Durable, PermCache0) of + {ok, XNameBin, RKey, QNameBin, PermCache} -> + check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache); + {error, _} -> + try_target_v2(Address, Vhost, User, PermCache0) + end. + +try_target_v2(Address, Vhost, User, PermCache) -> + case ensure_target_v2(Address, Vhost) of + {ok, to, RKey, QNameBin} -> + {ok, to, RKey, QNameBin, PermCache}; + {ok, XNameBin, RKey, QNameBin} -> + check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache); + {error, _} = Err -> + Err + end. + +check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) -> + XName = rabbit_misc:r(Vhost, exchange, XNameBin), + PermCache = check_resource_access(XName, write, User, PermCache0), + case rabbit_exchange:lookup(XName) of + {ok, X} -> + check_internal_exchange(X), + %% Pre-declared exchanges are protected against deletion and modification. + %% Let's cache the whole #exchange{} record to save a + %% rabbit_exchange:lookup(XName) call each time we receive a message. + Exchange = case XNameBin of + ?DEFAULT_EXCHANGE_NAME -> X; + <<"amq.", _/binary>> -> X; + _ -> XName + end, + {ok, Exchange, RKey, QNameBin, PermCache}; + {error, not_found} -> + not_found(XName) + end. + +ensure_target_v1({utf8, Address}, Vhost, User, Durable, PermCache0) -> + case rabbit_routing_parser:parse_endpoint(Address, true) of + {ok, Dest} -> + {QNameBin, PermCache} = ensure_terminus( + target, Dest, Vhost, User, Durable, PermCache0), + {XNameList1, RK} = rabbit_routing_parser:parse_routing(Dest), + XNameBin = unicode:characters_to_binary(XNameList1), + RoutingKey = case RK of + undefined -> subject; + [] -> subject; + _ -> unicode:characters_to_binary(RK) + end, + {ok, XNameBin, RoutingKey, QNameBin, PermCache}; + {error, _} = Err -> + Err + end; +ensure_target_v1(Address, _, _, _, _) -> + {error, {bad_address, Address}}. + +%% The possible v2 target address formats are: +%% /exchange/:exchange/key/:routing-key +%% /exchange/:exchange +%% /queue/:queue +%% +ensure_target_v2({utf8, String}, Vhost) -> + case parse_target_v2_string(String) of + {ok, _XNameBin, _RKey, undefined} = Ok -> + Ok; + {ok, _XNameBin, _RKey, QNameBin} = Ok -> + ok = exit_if_absent(queue, Vhost, QNameBin), + Ok; + {error, bad_address} -> + {error, {bad_address_string, String}} + end; +ensure_target_v2(undefined, _) -> + %% anonymous terminus + %% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-anonymous-relay + {ok, to, to, undefined}; +ensure_target_v2(Address, _) -> + {error, {bad_address, Address}}. + +parse_target_v2_string(<<"/exchange/", Rest/binary>>) -> + case split_exchange_target(Rest) of + {?DEFAULT_EXCHANGE_NAME, _} -> + {error, bad_address}; + {<<"amq.default">>, _} -> + {error, bad_address}; + {XNameBin, RKey} -> + {ok, XNameBin, RKey, undefined} + end; +parse_target_v2_string(<<"/queue/">>) -> + %% empty queue name is invalid + {error, bad_address}; +parse_target_v2_string(<<"/queue/", QNameBin/binary>>) -> + {ok, ?DEFAULT_EXCHANGE_NAME, QNameBin, QNameBin}; +parse_target_v2_string(_) -> + {error, bad_address}. + +%% Empty exchange name (default exchange) is valid. +split_exchange_target(Target) -> + Key = cp_amqp_target_address, + Pattern = try persistent_term:get(Key) + catch error:badarg -> + Cp = binary:compile_pattern(<<"/key/">>), + ok = persistent_term:put(Key, Cp), + Cp + end, + case binary:split(Target, Pattern) of + [XNameBin] -> + {XNameBin, <<>>}; + [XNameBin, RoutingKey] -> + {XNameBin, RoutingKey} end. handle_outgoing_mgmt_link_flow_control( @@ -2152,47 +2395,6 @@ credit_reply_timeout(QType, QName) -> default(undefined, Default) -> Default; default(Thing, _Default) -> Thing. -ensure_source(#'v1_0.source'{dynamic = true}, _, _) -> - protocol_error(?V_1_0_AMQP_ERROR_NOT_IMPLEMENTED, "Dynamic sources not supported", []); -ensure_source(#'v1_0.source'{address = Address, - durable = Durable}, - Vhost, - User = #user{username = Username}) -> - case Address of - {utf8, SourceAddr} -> - case rabbit_routing_parser:parse_endpoint(SourceAddr, false) of - {ok, Src} -> - QNameBin = ensure_terminus(source, Src, Vhost, User, Durable), - case rabbit_routing_parser:parse_routing(Src) of - {"", QNameList} -> - true = string:equal(QNameList, QNameBin), - {ok, QNameBin}; - {XNameList, RoutingKeyList} -> - RoutingKey = unicode:characters_to_binary(RoutingKeyList), - XNameBin = unicode:characters_to_binary(XNameList), - XName = rabbit_misc:r(Vhost, exchange, XNameBin), - QName = rabbit_misc:r(Vhost, queue, QNameBin), - Binding = #binding{source = XName, - destination = QName, - key = RoutingKey}, - check_write_permitted(QName, User), - check_read_permitted(XName, User), - {ok, X} = rabbit_exchange:lookup(XName), - check_read_permitted_on_topic(X, User, RoutingKey), - case rabbit_binding:add(Binding, Username) of - ok -> - {ok, QNameBin}; - {error, _} = Err -> - Err - end - end; - {error, _} = Err -> - Err - end; - _ -> - {error, {address_not_utf8_string, Address}} - end. - transfer_frames(Transfer, Sections, unlimited) -> [[Transfer, Sections]]; transfer_frames(Transfer, Sections, MaxFrameSize) -> @@ -2281,20 +2483,16 @@ keyfind_unpack_described(Key, KvList) -> end. validate_attach(#'v1_0.attach'{target = #'v1_0.coordinator'{}}) -> - protocol_error(?V_1_0_AMQP_ERROR_NOT_IMPLEMENTED, - "Transactions not supported", []); -validate_attach(#'v1_0.attach'{unsettled = Unsettled, - incomplete_unsettled = IncompleteSettled}) - when Unsettled =/= undefined andalso Unsettled =/= {map, []} orelse - IncompleteSettled =:= true -> - protocol_error(?V_1_0_AMQP_ERROR_NOT_IMPLEMENTED, - "Link recovery not supported", []); + not_implemented("Transactions not supported"); +validate_attach(#'v1_0.attach'{unsettled = {map, [_|_]}}) -> + not_implemented("Link recovery not supported"); +validate_attach(#'v1_0.attach'{incomplete_unsettled = true}) -> + not_implemented("Link recovery not supported"); validate_attach( #'v1_0.attach'{snd_settle_mode = SndSettleMode, rcv_settle_mode = ?V_1_0_RECEIVER_SETTLE_MODE_SECOND}) when SndSettleMode =/= ?V_1_0_SENDER_SETTLE_MODE_SETTLED -> - protocol_error(?V_1_0_AMQP_ERROR_NOT_IMPLEMENTED, - "rcv-settle-mode second not supported", []); + not_implemented("rcv-settle-mode second not supported"); validate_attach(#'v1_0.attach'{}) -> ok. @@ -2328,8 +2526,7 @@ validate_multi_transfer_settled(Other, First) %% "If the message is being sent settled by the sender, %% the value of this field [rcv-settle-mode] is ignored." [2.7.5] validate_transfer_rcv_settle_mode(?V_1_0_RECEIVER_SETTLE_MODE_SECOND, _Settled = false) -> - protocol_error(?V_1_0_AMQP_ERROR_NOT_IMPLEMENTED, - "rcv-settle-mode second not supported", []); + not_implemented("rcv-settle-mode second not supported"); validate_transfer_rcv_settle_mode(_, _) -> ok. @@ -2356,52 +2553,66 @@ validate_message_size(Message, MaxMsgSize) [MsgSize, MaxMsgSize]) end. -ensure_terminus(Type, {exchange, {XNameList, _RoutingKey}}, Vhost, User, Durability) -> +-spec ensure_terminus(source | target, + term(), + rabbit_types:vhost(), + rabbit_types:user(), + {uint, 0..2}, + permission_cache()) -> + {undefined | rabbit_misc:resource_name(), + permission_cache()}. +ensure_terminus(Type, {exchange, {XNameList, _RoutingKey}}, Vhost, User, Durability, PermCache) -> ok = exit_if_absent(exchange, Vhost, XNameList), case Type of - target -> undefined; - source -> declare_queue(generate_queue_name(), Vhost, User, Durability) + target -> {undefined, PermCache}; + source -> declare_queue(generate_queue_name(), Vhost, User, Durability, PermCache) end; -ensure_terminus(target, {topic, _bindingkey}, _, _, _) -> +ensure_terminus(target, {topic, _bindingkey}, _, _, _, PermCache) -> %% exchange amq.topic exists - undefined; -ensure_terminus(source, {topic, _BindingKey}, Vhost, User, Durability) -> + {undefined, PermCache}; +ensure_terminus(source, {topic, _BindingKey}, Vhost, User, Durability, PermCache) -> %% exchange amq.topic exists - declare_queue(generate_queue_name(), Vhost, User, Durability); -ensure_terminus(target, {queue, undefined}, _, _, _) -> + declare_queue(generate_queue_name(), Vhost, User, Durability, PermCache); +ensure_terminus(target, {queue, undefined}, _, _, _, PermCache) -> %% Target "/queue" means publish to default exchange with message subject as routing key. %% Default exchange exists. - undefined; -ensure_terminus(_, {queue, QNameList}, Vhost, User, Durability) -> - declare_queue(unicode:characters_to_binary(QNameList), Vhost, User, Durability); -ensure_terminus(_, {amqqueue, QNameList}, Vhost, _, _) -> + {undefined, PermCache}; +ensure_terminus(_, {queue, QNameList}, Vhost, User, Durability, PermCache) -> + declare_queue(unicode:characters_to_binary(QNameList), Vhost, User, Durability, PermCache); +ensure_terminus(_, {amqqueue, QNameList}, Vhost, _, _, PermCache) -> %% Target "/amq/queue/" is handled specially due to AMQP legacy: %% "Queue names starting with "amq." are reserved for pre-declared and %% standardised queues. The client MAY declare a queue starting with "amq." %% if the passive option is set, or the queue already exists." QNameBin = unicode:characters_to_binary(QNameList), ok = exit_if_absent(queue, Vhost, QNameBin), - QNameBin. + {QNameBin, PermCache}. + +exit_if_absent(Kind, Vhost, Name) when is_list(Name) -> + exit_if_absent(Kind, Vhost, unicode:characters_to_binary(Name)); +exit_if_absent(Kind, Vhost, Name) when is_binary(Name) -> + exit_if_absent(rabbit_misc:r(Vhost, Kind, Name)). -exit_if_absent(Kind, Vhost, Name) -> - ResourceName = rabbit_misc:r(Vhost, Kind, unicode:characters_to_binary(Name)), +exit_if_absent(ResourceName = #resource{kind = Kind}) -> Mod = case Kind of exchange -> rabbit_exchange; queue -> rabbit_amqqueue end, case Mod:exists(ResourceName) of - true -> - ok; - false -> - protocol_error(?V_1_0_AMQP_ERROR_NOT_FOUND, "no ~ts", [rabbit_misc:rs(ResourceName)]) + true -> ok; + false -> not_found(ResourceName) end. generate_queue_name() -> rabbit_guid:binary(rabbit_guid:gen_secure(), "amq.gen"). -declare_queue(QNameBin, Vhost, User = #user{username = Username}, TerminusDurability) -> +declare_queue(QNameBin, + Vhost, + User = #user{username = Username}, + TerminusDurability, + PermCache0) -> QName = rabbit_misc:r(Vhost, queue, QNameBin), - check_configure_permitted(QName, User), + PermCache = check_resource_access(QName, configure, User, PermCache0), check_vhost_queue_limit(Vhost, QName), rabbit_core_metrics:queue_declared(QName), Q0 = amqqueue:new(QName, @@ -2415,13 +2626,15 @@ declare_queue(QNameBin, Vhost, User = #user{username = Username}, TerminusDurabi rabbit_classic_queue), case rabbit_queue_type:declare(Q0, node()) of {new, _Q} -> - rabbit_core_metrics:queue_created(QName), - QNameBin; + rabbit_core_metrics:queue_created(QName); {existing, _Q} -> - QNameBin; + ok; Other -> - protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, "Failed to declare ~s: ~p", [rabbit_misc:rs(QName), Other]) - end. + protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, + "Failed to declare ~s: ~p", + [rabbit_misc:rs(QName), Other]) + end, + {QNameBin, PermCache}. outcomes(#'v1_0.source'{outcomes = undefined}) -> {array, symbol, ?OUTCOMES}; @@ -2430,16 +2643,10 @@ outcomes(#'v1_0.source'{outcomes = {array, symbol, Syms} = Outcomes}) -> [] -> Outcomes; Unsupported -> - rabbit_amqp_util:protocol_error( - ?V_1_0_AMQP_ERROR_NOT_IMPLEMENTED, - "Outcomes not supported: ~tp", - [Unsupported]) + not_implemented("Outcomes not supported: ~tp", [Unsupported]) end; outcomes(#'v1_0.source'{outcomes = Unsupported}) -> - rabbit_amqp_util:protocol_error( - ?V_1_0_AMQP_ERROR_NOT_IMPLEMENTED, - "Outcomes not supported: ~tp", - [Unsupported]); + not_implemented("Outcomes not supported: ~tp", [Unsupported]); outcomes(_) -> {array, symbol, ?OUTCOMES}. @@ -2586,48 +2793,63 @@ maybe_detach_mgmt_link( check_internal_exchange(#exchange{internal = true, name = XName}) -> protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, - "attach to internal ~ts is forbidden", + "forbidden to publish to internal ~ts", [rabbit_misc:rs(XName)]); check_internal_exchange(_) -> ok. -check_write_permitted(Resource, User) -> - check_resource_access(Resource, User, write). - -check_read_permitted(Resource, User) -> - check_resource_access(Resource, User, read). - -check_configure_permitted(Resource, User) -> - check_resource_access(Resource, User, configure). - -check_resource_access(Resource, User, Perm) -> - Context = #{}, - ok = try rabbit_access_control:check_resource_access(User, Resource, Perm, Context) - catch exit:#amqp_error{name = access_refused, - explanation = Msg} -> - protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, Msg, []) - end. - -check_write_permitted_on_topic(Resource, User, RoutingKey) -> - check_topic_authorisation(Resource, User, RoutingKey, write). +-spec check_resource_access(rabbit_types:r(exchange | queue), + rabbit_types:permission_atom(), + rabbit_types:user(), + permission_cache()) -> + permission_cache(). +check_resource_access(Resource, Perm, User, Cache) -> + CacheElem = {Resource, Perm}, + case lists:member(CacheElem, Cache) of + true -> + Cache; + false -> + Context = #{}, + try rabbit_access_control:check_resource_access(User, Resource, Perm, Context) of + ok -> + CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1), + [CacheElem | CacheTail] + catch + exit:#amqp_error{name = access_refused, + explanation = Msg} -> + protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, Msg, []) + end + end. -check_read_permitted_on_topic(Resource, User, RoutingKey) -> - check_topic_authorisation(Resource, User, RoutingKey, read). +-spec check_write_permitted_on_topic( + rabbit_types:exchange(), + rabbit_types:user(), + rabbit_types:routing_key(), + topic_permission_cache()) -> + topic_permission_cache(). +check_write_permitted_on_topic(Resource, User, RoutingKey, TopicPermCache) -> + check_topic_authorisation(Resource, User, RoutingKey, write, TopicPermCache). + +-spec check_read_permitted_on_topic( + rabbit_types:exchange(), + rabbit_types:user(), + rabbit_types:routing_key(), + topic_permission_cache()) -> + topic_permission_cache(). +check_read_permitted_on_topic(Resource, User, RoutingKey, TopicPermCache) -> + check_topic_authorisation(Resource, User, RoutingKey, read, TopicPermCache). check_topic_authorisation(#exchange{type = topic, name = XName = #resource{virtual_host = VHost}}, User = #user{username = Username}, RoutingKey, - Permission) -> + Permission, + Cache) -> Resource = XName#resource{kind = topic}, CacheElem = {Resource, RoutingKey, Permission}, - Cache = case get(?TOPIC_PERMISSION_CACHE) of - undefined -> []; - List -> List - end, case lists:member(CacheElem, Cache) of true -> - ok; + Cache; false -> VariableMap = #{<<"vhost">> => VHost, <<"username">> => Username}, @@ -2636,16 +2858,15 @@ check_topic_authorisation(#exchange{type = topic, try rabbit_access_control:check_topic_access(User, Resource, Permission, Context) of ok -> CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1), - put(?TOPIC_PERMISSION_CACHE, [CacheElem | CacheTail]), - ok + [CacheElem | CacheTail] catch exit:#amqp_error{name = access_refused, explanation = Msg} -> protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, Msg, []) end end; -check_topic_authorisation(_, _, _, _) -> - ok. +check_topic_authorisation(_, _, _, _, Cache) -> + Cache. check_vhost_queue_limit(Vhost, QName) -> case rabbit_vhost_limit:is_over_queue_limit(Vhost) of @@ -2701,6 +2922,23 @@ property_paired_not_set() -> protocol_error(?V_1_0_AMQP_ERROR_INVALID_FIELD, "Link property 'paired' is not set to boolean value 'true'", []). +-spec not_implemented(io:format()) -> no_return(). +not_implemented(Format) -> + not_implemented(Format, []). + +-spec not_implemented(io:format(), [term()]) -> no_return(). +not_implemented(Format, Args) -> + protocol_error(?V_1_0_AMQP_ERROR_NOT_IMPLEMENTED, Format, Args). + +-spec not_found(rabbit_types:r(exchange | queue)) -> no_return(). +not_found(Resource) -> + protocol_error(?V_1_0_AMQP_ERROR_NOT_FOUND, + "no ~ts", + [rabbit_misc:rs(Resource)]). + +address_v1_permitted() -> + rabbit_deprecated_features:is_permitted(amqp_address_v1). + format_status( #{state := #state{cfg = Cfg, outgoing_pending = OutgoingPending, @@ -2720,7 +2958,9 @@ format_status( stashed_settled = StashedSettled, stashed_down = StashedDown, stashed_eol = StashedEol, - queue_states = QueueStates}} = Status) -> + queue_states = QueueStates, + permission_cache = PermissionCache, + topic_permission_cache = TopicPermissionCache}} = Status) -> State = #{cfg => Cfg, outgoing_pending => queue:len(OutgoingPending), remote_incoming_window => RemoteIncomingWindow, @@ -2739,5 +2979,7 @@ format_status( stashed_settled => StashedSettled, stashed_down => StashedDown, stashed_eol => StashedEol, - queue_states => rabbit_queue_type:format_status(QueueStates)}, + queue_states => rabbit_queue_type:format_status(QueueStates), + permission_cache => PermissionCache, + topic_permission_cache => TopicPermissionCache}, maps:update(state, State, Status). diff --git a/deps/rabbit/test/amqp_address_SUITE.erl b/deps/rabbit/test/amqp_address_SUITE.erl new file mode 100644 index 000000000000..41cdac735ff6 --- /dev/null +++ b/deps/rabbit/test/amqp_address_SUITE.erl @@ -0,0 +1,617 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 VMware, Inc. or its affiliates. All rights reserved. + +-module(amqp_address_SUITE). + +-compile([export_all, + nowarn_export_all]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp10_common/include/amqp10_framing.hrl"). +-include_lib("rabbitmq_amqp_client/include/rabbitmq_amqp_client.hrl"). + +-import(rabbit_ct_broker_helpers, + [rpc/4]). +-import(rabbit_ct_helpers, + [eventually/1]). + +all() -> + [ + {group, v1_permitted}, + {group, v2} + ]. + +groups() -> + [ + {v1_permitted, [shuffle], + common_tests() + }, + {v2, [shuffle], + [ + target_queue_absent, + source_queue_absent, + target_bad_v2_address, + source_bad_v2_address + ] ++ common_tests() + } + ]. + +common_tests() -> + [ + target_exchange_routing_key, + target_exchange_routing_key_with_slash, + target_exchange_routing_key_empty, + target_exchange, + target_exchange_absent, + queue, + queue_with_slash, + target_per_message_exchange_routing_key, + target_per_message_exchange, + target_per_message_queue, + target_per_message_unset_to_address, + target_per_message_bad_to_address, + target_per_message_exchange_absent, + target_bad_address, + source_bad_address + ]. + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(amqp10_client), + rabbit_ct_helpers:log_environment(), + Config. + +end_per_suite(Config) -> + Config. + +init_per_group(Group, Config0) -> + PermitV1 = case Group of + v1_permitted -> true; + v2 -> false + end, + Config = rabbit_ct_helpers:merge_app_env( + Config0, + {rabbit, + [{permit_deprecated_features, + #{amqp_address_v1 => PermitV1} + }] + }), + rabbit_ct_helpers:run_setup_steps( + Config, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_group(_Group, Config) -> + rabbit_ct_helpers:run_teardown_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% Test v2 target address +%% /exchange/:exchange/key/:routing-key +target_exchange_routing_key(Config) -> + XName = <<"๐Ÿ‘‰"/utf8>>, + RKey = <<"๐Ÿ—๏ธ"/utf8>>, + target_exchange_routing_key0(XName, RKey, Config). + +%% Test v2 target address +%% /exchange/:exchange/key/:routing-key +%% where both :exchange and :routing-key contains a "/" character. +target_exchange_routing_key_with_slash(Config) -> + XName = <<"my/exchange">>, + RKey = <<"my/key">>, + target_exchange_routing_key0(XName, RKey, Config). + +target_exchange_routing_key0(XName, RKey, Config) -> + TargetAddr = <<"/exchange/", XName/binary, "/key/", RKey/binary>>, + QName = atom_to_binary(?FUNCTION_NAME), + + Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), + ok = rabbitmq_amqp_client:declare_exchange(LinkPair, XName, #{}), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, XName, RKey, #{}), + SrcAddr = <<"/queue/", QName/binary>>, + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, SrcAddr), + + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, TargetAddr), + ok = wait_for_credit(Sender), + Body = <<"body">>, + Msg0 = amqp10_msg:new(<<"tag">>, Body, true), + %% Although mc_amqp:essential_properties/1 parses these annotations, they should be ignored. + Msg1 = amqp10_msg:set_message_annotations( + #{<<"x-exchange">> => <<"ignored">>, + <<"x-routing-key">> => <<"ignored">>}, + Msg0), + ok = amqp10_client:send_msg(Sender, Msg1), + + {ok, Msg} = amqp10_client:get_msg(Receiver), + ?assertEqual([Body], amqp10_msg:body(Msg)), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:delete_exchange(LinkPair, XName), + ok = cleanup(Init). + +%% Test v2 target address +%% /exchange/:exchange/key/ +%% Routing key is empty. +target_exchange_routing_key_empty(Config) -> + XName = <<"amq.fanout">>, + QName = atom_to_binary(?FUNCTION_NAME), + TargetAddr = <<"/exchange/", XName/binary, "/key/">>, + + Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, XName, <<"ignored">>, #{}), + SrcAddr = <<"/queue/", QName/binary>>, + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, SrcAddr), + + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, TargetAddr), + ok = wait_for_credit(Sender), + Body = <<"body">>, + Msg0 = amqp10_msg:new(<<"tag">>, Body, true), + ok = amqp10_client:send_msg(Sender, Msg0), + + {ok, Msg} = amqp10_client:get_msg(Receiver), + ?assertEqual([Body], amqp10_msg:body(Msg)), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = cleanup(Init). + +%% Test v2 target address +%% /exchange/:exchange +%% Routing key is empty. +target_exchange(Config) -> + XName = <<"amq.fanout">>, + TargetAddr = <<"/exchange/", XName/binary>>, + QName = atom_to_binary(?FUNCTION_NAME), + + Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, XName, <<"ignored">>, #{}), + SrcAddr = <<"/queue/", QName/binary>>, + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, SrcAddr), + + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, TargetAddr), + ok = wait_for_credit(Sender), + Body = <<"body">>, + Msg0 = amqp10_msg:new(<<"tag">>, Body, true), + ok = amqp10_client:send_msg(Sender, Msg0), + + {ok, Msg} = amqp10_client:get_msg(Receiver), + ?assertEqual([Body], amqp10_msg:body(Msg)), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = cleanup(Init). + +%% Test v2 target address +%% /exchange/:exchange +%% where the target exchange does not exist. +target_exchange_absent(Config) -> + XName = <<"๐ŸŽˆ"/utf8>>, + TargetAddr = <<"/exchange/", XName/binary>>, + + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + + {ok, _Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, TargetAddr), + receive + {amqp10_event, + {session, Session, + {ended, + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_NOT_FOUND, + description = {utf8, <<"no exchange '", XName:(byte_size(XName))/binary, + "' in vhost '/'">>}}}}} -> ok + after 5000 -> + Reason = {missing_event, ?LINE}, + flush(Reason), + ct:fail(Reason) + end, + ok = amqp10_client:close_connection(Connection). + +%% Test v2 target and source address +%% /queue/:queue +queue(Config) -> + QName = <<"๐ŸŽˆ"/utf8>>, + queue0(QName, Config). + +%% Test v2 target and source address +%% /queue/:queue +%% where :queue contains a "/" character. +queue_with_slash(Config) -> + QName = <<"my/queue">>, + queue0(QName, Config). + +queue0(QName, Config) -> + Addr = <<"/queue/", QName/binary>>, + + Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Addr), + + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Addr), + ok = wait_for_credit(Sender), + Body = <<"body">>, + Msg0 = amqp10_msg:new(<<"tag">>, Body, true), + ok = amqp10_client:send_msg(Sender, Msg0), + + {ok, Msg} = amqp10_client:get_msg(Receiver), + ?assertEqual([Body], amqp10_msg:body(Msg)), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = cleanup(Init). + +%% Test v2 target address +%% /queue/:queue +%% where the target queue does not exist. +target_queue_absent(Config) -> + QName = <<"๐ŸŽˆ"/utf8>>, + TargetAddr = <<"/queue/", QName/binary>>, + + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + + {ok, _Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, TargetAddr), + receive + {amqp10_event, + {session, Session, + {ended, + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_NOT_FOUND, + description = {utf8, <<"no queue '", QName:(byte_size(QName))/binary, + "' in vhost '/'">>}}}}} -> ok + after 5000 -> + Reason = {missing_event, ?LINE}, + flush(Reason), + ct:fail(Reason) + end, + ok = amqp10_client:close_connection(Connection). + +%% Test v2 target address 'null' and 'to' +%% /exchange/:exchange/key/:routing-key +%% with varying routing keys. +target_per_message_exchange_routing_key(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + DirectX = <<"amq.direct">>, + RKey1 = <<"๐Ÿ—๏ธ1"/utf8>>, + RKey2 = <<"๐Ÿ—๏ธ2"/utf8>>, + To1 = <<"/exchange/", DirectX/binary, "/key/", RKey1/binary>>, + To2 = <<"/exchange/", DirectX/binary, "/key/", RKey2/binary>>, + + Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, DirectX, RKey1, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, DirectX, RKey2, #{}), + + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null), + ok = wait_for_credit(Sender), + + Tag1 = Body1 = <<1>>, + Tag2 = Body2 = <<2>>, + + %% Although mc_amqp:essential_properties/1 parses these annotations, they should be ignored. + Msg1 = amqp10_msg:set_message_annotations( + #{<<"x-exchange">> => <<"ignored">>, + <<"x-routing-key">> => <<"ignored">>}, + amqp10_msg:set_properties(#{to => To1}, amqp10_msg:new(Tag1, Body1))), + Msg2 = amqp10_msg:set_properties(#{to => To2}, amqp10_msg:new(Tag2, Body2)), + ok = amqp10_client:send_msg(Sender, Msg1), + ok = amqp10_client:send_msg(Sender, Msg2), + ok = wait_for_settled(accepted, Tag1), + ok = wait_for_settled(accepted, Tag2), + + {ok, #{message_count := 2}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = cleanup(Init). + +%% Test v2 target address 'null' and 'to' +%% /exchange/:exchange +%% with varying exchanges. +target_per_message_exchange(Config) -> + XFanout = <<"amq.fanout">>, + XHeaders = <<"amq.headers">>, + To1 = <<"/exchange/", XFanout/binary>>, + To2 = <<"/exchange/", XHeaders/binary>>, + QName = atom_to_binary(?FUNCTION_NAME), + + Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, XFanout, <<>>, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, XHeaders, <<>>, + #{<<"my key">> => true, + <<"x-match">> => {utf8, <<"any">>}}), + + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null), + ok = wait_for_credit(Sender), + + Tag1 = Body1 = <<1>>, + Tag2 = Body2 = <<2>>, + Msg1 = amqp10_msg:set_properties(#{to => To1}, amqp10_msg:new(Tag1, Body1)), + Msg2 = amqp10_msg:set_application_properties( + #{<<"my key">> => true}, + amqp10_msg:set_properties(#{to => To2}, amqp10_msg:new(Tag2, Body2))), + ok = amqp10_client:send_msg(Sender, Msg1), + ok = amqp10_client:send_msg(Sender, Msg2), + ok = wait_for_settled(accepted, Tag1), + ok = wait_for_settled(accepted, Tag2), + + {ok, #{message_count := 2}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = cleanup(Init). + +%% Test v2 target address 'null' and 'to' +%% /queue/:queue +target_per_message_queue(Config) -> + Q1 = <<"q1">>, + Q2 = <<"q2">>, + Q3 = <<"q3">>, + To1 = <<"/queue/", Q1/binary>>, + To2 = <<"/queue/", Q2/binary>>, + To3 = <<"/queue/", Q3/binary>>, + + Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, Q1, #{}), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, Q2, #{}), + + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null), + ok = wait_for_credit(Sender), + + Tag1 = Body1 = <<1>>, + Tag2 = Body2 = <<2>>, + Tag3 = Body3 = <<3>>, + Msg1 = amqp10_msg:set_properties(#{to => To1}, amqp10_msg:new(Tag1, Body1)), + Msg2 = amqp10_msg:set_properties(#{to => To2}, amqp10_msg:new(Tag2, Body2)), + Msg3 = amqp10_msg:set_properties(#{to => To3}, amqp10_msg:new(Tag3, Body3)), + ok = amqp10_client:send_msg(Sender, Msg1), + ok = amqp10_client:send_msg(Sender, Msg2), + ok = amqp10_client:send_msg(Sender, Msg3), + ok = wait_for_settled(accepted, Tag1), + ok = wait_for_settled(accepted, Tag2), + ok = wait_for_settled(released, Tag3), + + {ok, #{message_count := 1}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q1), + {ok, #{message_count := 1}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q2), + ok = cleanup(Init). + +%% Test v2 target address 'null', but 'to' not set. +target_per_message_unset_to_address(Config) -> + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null), + ok = wait_for_credit(Sender), + + %% Send message with 'to' unset. + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<0>>, <<0>>)), + receive + {amqp10_event, + {session, Session, + {ended, + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED, + description = {utf8, <<"anonymous terminus requires 'to' address to be set">>}}}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + ok = amqp10_client:close_connection(Connection). + +bad_v2_addresses() -> + [ + %% valid v1, but bad v2 target addresses + <<"/topic/mytopic">>, + <<"/amq/queue/myqueue">>, + <<"myqueue">>, + <<"/queue">>, + %% bad v2 target addresses + <<"/queue/">>, + <<"/ex/โœ‹"/utf8>>, + <<"/exchange">>, + %% default exchange in v2 target address is disallowed + <<"/exchange/">>, + <<"/exchange/amq.default">>, + <<"/exchange//key/">>, + <<"/exchange//key/mykey">>, + <<"/exchange/amq.default/key/">>, + <<"/exchange/amq.default/key/mykey">> + ]. + +%% Test v2 target address 'null' with an invalid 'to' addresses. +target_per_message_bad_to_address(Config) -> + lists:foreach(fun(Addr) -> + ok = target_per_message_bad_to_address0(Addr, Config) + end, bad_v2_addresses()). + +target_per_message_bad_to_address0(Address, Config) -> + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null), + ok = wait_for_credit(Sender), + + Msg = amqp10_msg:set_properties(#{to => Address}, amqp10_msg:new(<<0>>, <<0>>)), + ok = amqp10_client:send_msg(Sender, Msg), + receive + {amqp10_event, + {session, Session, + {ended, + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED, + description = {utf8, <<"bad 'to' address", _Rest/binary>>}}}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE, Address}) + end, + ok = amqp10_client:close_connection(Connection). + +target_per_message_exchange_absent(Config) -> + Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), + XName = <<"๐ŸŽˆ"/utf8>>, + Address = <<"/exchange/", XName/binary>>, + ok = rabbitmq_amqp_client:declare_exchange(LinkPair, XName, #{}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null), + ok = wait_for_credit(Sender), + + DTag1 = <<1>>, + Msg1 = amqp10_msg:set_properties(#{to => Address}, amqp10_msg:new(DTag1, <<"m1">>)), + ok = amqp10_client:send_msg(Sender, Msg1), + ok = wait_for_settled(released, DTag1), + + ok = rabbitmq_amqp_client:delete_exchange(LinkPair, XName), + + DTag2 = <<2>>, + Msg2 = amqp10_msg:set_properties(#{to => Address}, amqp10_msg:new(DTag2, <<"m2">>)), + ok = amqp10_client:send_msg(Sender, Msg2), + ok = wait_for_settled(released, DTag2), + receive {amqp10_event, {link, Sender, {detached, Error}}} -> + ?assertEqual( + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_NOT_FOUND, + description = {utf8, <<"no exchange '", XName/binary, "' in vhost '/'">>}}, + Error) + after 5000 -> ct:fail("server did not close our outgoing link") + end, + + ok = cleanup(Init). + +target_bad_address(Config) -> + %% bad v1 and bad v2 target address + TargetAddr = <<"/qqq/๐ŸŽˆ"/utf8>>, + target_bad_address0(TargetAddr, Config). + +target_bad_v2_address(Config) -> + lists:foreach(fun(Addr) -> + ok = target_bad_address0(Addr, Config) + end, bad_v2_addresses()). + +target_bad_address0(TargetAddress, Config) -> + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + + {ok, _Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, TargetAddress), + receive + {amqp10_event, + {session, Session, + {ended, + #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD}}}} -> ok + after 5000 -> + Reason = {missing_event, ?LINE, TargetAddress}, + flush(Reason), + ct:fail(Reason) + end, + ok = amqp10_client:close_connection(Connection). + +%% Test v2 source address +%% /queue/:queue +%% where the source queue does not exist. +source_queue_absent(Config) -> + QName = <<"๐ŸŽˆ"/utf8>>, + SourceAddr = <<"/queue/", QName/binary>>, + + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + + {ok, _Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, SourceAddr), + receive + {amqp10_event, + {session, Session, + {ended, + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_NOT_FOUND, + description = {utf8, <<"no queue '", QName:(byte_size(QName))/binary, + "' in vhost '/'">>}}}}} -> ok + after 5000 -> + Reason = {missing_event, ?LINE}, + flush(Reason), + ct:fail(Reason) + end, + ok = amqp10_client:close_connection(Connection). + +source_bad_address(Config) -> + %% bad v1 and bad v2 source address + SourceAddr = <<"/qqq/๐ŸŽˆ"/utf8>>, + source_bad_address0(SourceAddr, Config). + +source_bad_v2_address(Config) -> + %% valid v1, but bad v2 source addresses + SourceAddresses = [<<"/exchange/myroutingkey">>, + <<"/topic/mytopic">>, + <<"/amq/queue/myqueue">>, + <<"myqueue">>], + lists:foreach(fun(Addr) -> + ok = source_bad_address0(Addr, Config) + end, SourceAddresses). + +source_bad_address0(SourceAddress, Config) -> + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + + {ok, _Receiver} = amqp10_client:attach_receiver_link(Session, <<"sender">>, SourceAddress), + receive + {amqp10_event, + {session, Session, + {ended, + #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD}}}} -> ok + after 5000 -> + Reason = {missing_event, ?LINE}, + flush(Reason), + ct:fail(Reason) + end, + ok = amqp10_client:close_connection(Connection). + +init(Config) -> + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"mgmt link pair">>), + {Connection, LinkPair}. + +cleanup({Connection, LinkPair = #link_pair{session = Session}}) -> + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = amqp10_client:end_session(Session), + ok = amqp10_client:close_connection(Connection). + +connection_config(Config) -> + Host = ?config(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + #{address => Host, + port => Port, + container_id => <<"my container">>, + sasl => {plain, <<"guest">>, <<"guest">>}}. + +% before we can send messages we have to wait for credit from the server +wait_for_credit(Sender) -> + receive + {amqp10_event, {link, Sender, credited}} -> + flush(?FUNCTION_NAME), + ok + after 5000 -> + flush(?FUNCTION_NAME), + ct:fail(?FUNCTION_NAME) + end. + +wait_for_settled(State, Tag) -> + receive + {amqp10_disposition, {State, Tag}} -> + ok + after 5000 -> + Reason = {?FUNCTION_NAME, State, Tag}, + flush(Reason), + ct:fail(Reason) + end. + +flush(Prefix) -> + receive Msg -> + ct:pal("~tp flushed: ~p~n", [Prefix, Msg]), + flush(Prefix) + after 1 -> + ok + end. diff --git a/deps/rabbit/test/amqp_auth_SUITE.erl b/deps/rabbit/test/amqp_auth_SUITE.erl index 0aeacd0e1b99..1030ac99e157 100644 --- a/deps/rabbit/test/amqp_auth_SUITE.erl +++ b/deps/rabbit/test/amqp_auth_SUITE.erl @@ -24,19 +24,38 @@ all() -> [ - {group, tests} + {group, address_v1}, + {group, address_v2} ]. groups() -> [ - {tests, [shuffle], + {address_v1, [shuffle], [ + %% authz + v1_attach_target_queue, + v1_attach_source_exchange, + v1_send_to_topic, + v1_send_to_topic_using_subject, + v1_attach_source_topic, + v1_attach_target_internal_exchange, + + %% limits + v1_vhost_queue_limit + ] + }, + {address_v2, [shuffle], + [ + %% authz + attach_source_queue, + attach_target_exchange, + attach_target_topic_exchange, attach_target_queue, - attach_source_exchange, - send_to_topic, - send_to_topic_using_subject, - attach_source_topic, - attach_target_internal_exchange, + target_per_message_exchange, + target_per_message_internal_exchange, + target_per_message_topic, + + %% authn authn_failure_event, sasl_anonymous_success, sasl_none_success, @@ -45,9 +64,10 @@ groups() -> sasl_none_failure, sasl_plain_failure, vhost_absent, + + %% limits vhost_connection_limit, user_connection_limit, - vhost_queue_limit, %% AMQP Management operations against HTTP API v2 declare_exchange, @@ -71,16 +91,24 @@ groups() -> ]. init_per_suite(Config) -> - application:ensure_all_started(amqp10_client), + {ok, _} = application:ensure_all_started(amqp10_client), rabbit_ct_helpers:log_environment(), Config. end_per_suite(Config) -> Config. -init_per_group(_Group, Config0) -> +init_per_group(Group, Config0) -> + PermitV1 = case Group of + address_v1 -> true; + address_v2 -> false + end, + Config1 = rabbit_ct_helpers:merge_app_env( + Config0, {rabbit, + [{permit_deprecated_features, + #{amqp_address_v1 => PermitV1}}]}), Config = rabbit_ct_helpers:run_setup_steps( - Config0, + Config1, rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()), Vhost = <<"test vhost">>, @@ -107,7 +135,7 @@ end_per_testcase(Testcase, Config) -> ok = clear_permissions(Config), rabbit_ct_helpers:testcase_finished(Config, Testcase). -attach_target_queue(Config) -> +v1_attach_target_queue(Config) -> QName = <<"test queue">>, %% This target address means RabbitMQ will create a queue %% requiring configure access on the queue. @@ -152,7 +180,7 @@ attach_target_queue(Config) -> ok = close_connection_sync(Connection). -attach_source_exchange(Config) -> +v1_attach_source_exchange(Config) -> %% This source address means RabbitMQ will create a queue with a generated name %% prefixed with amq.gen requiring configure access on the queue. %% The queue is bound to the fanout exchange requiring write access on the queue @@ -233,14 +261,14 @@ attach_source_exchange(Config) -> ok = close_connection_sync(Connection). -send_to_topic(Config) -> +v1_send_to_topic(Config) -> TargetAddresses = [<<"/topic/test vhost.test user.a.b">>, <<"/exchange/amq.topic/test vhost.test user.a.b">>], lists:foreach(fun(Address) -> - ok = send_to_topic0(Address, Config) + ok = send_to_topic(Address, Config) end, TargetAddresses). -send_to_topic0(TargetAddress, Config) -> +send_to_topic(TargetAddress, Config) -> User = ?config(test_user, Config), Vhost = ?config(test_vhost, Config), ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, Vhost), @@ -279,7 +307,7 @@ send_to_topic0(TargetAddress, Config) -> ok = amqp10_client:detach_link(Sender2), ok = close_connection_sync(Connection). -send_to_topic_using_subject(Config) -> +v1_send_to_topic_using_subject(Config) -> TargetAddress = <<"/exchange/amq.topic">>, User = ?config(test_user, Config), Vhost = ?config(test_vhost, Config), @@ -317,7 +345,7 @@ send_to_topic_using_subject(Config) -> ok = close_connection_sync(Connection). -attach_source_topic(Config) -> +v1_attach_source_topic(Config) -> %% These source addresses mean RabbitMQ will bind a queue to the default topic %% exchange with binding key 'test vhost.test user.a.b'. %% Therefore, we need read access to that topic. @@ -358,7 +386,7 @@ attach_source_topic0(SourceAddress, Config) -> ok = close_connection_sync(Connection). -attach_target_internal_exchange(Config) -> +v1_attach_target_internal_exchange(Config) -> XName = <<"test exchange">>, Ch = rabbit_ct_client_helpers:open_channel(Config), #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{internal = true, @@ -372,7 +400,7 @@ attach_target_internal_exchange(Config) -> {ok, _} = amqp10_client:attach_sender_link( Session, <<"test-sender">>, Address), ExpectedErr = error_unauthorized( - <<"attach to internal exchange 'test exchange' in vhost '/' is forbidden">>), + <<"forbidden to publish to internal exchange 'test exchange' in vhost '/'">>), receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok after 5000 -> flush(missing_ended), ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") @@ -382,6 +410,171 @@ attach_target_internal_exchange(Config) -> #'exchange.delete_ok'{} = amqp_channel:call(Ch, #'exchange.delete'{exchange = XName}), ok = rabbit_ct_client_helpers:close_channel(Ch). +attach_source_queue(Config) -> + {Conn, Session, LinkPair} = init_pair(Config), + QName = <<"๐Ÿฟ"/utf8>>, + Address = <<"/queue/", QName/binary>>, + + %% missing read permission to queue + ok = set_permissions(Config, QName, <<>>, <<>>), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + + {ok, _Recv} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address), + ExpectedErr = error_unauthorized( + <<"read access to queue '", QName/binary, + "' in vhost 'test vhost' refused for user 'test user'">>), + receive {amqp10_event, + {session, Session, + {ended, ExpectedErr}}} -> ok + after 5000 -> flush(missing_ended), + ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") + end, + ok = close_connection_sync(Conn). + +attach_target_exchange(Config) -> + XName = <<"amq.fanout">>, + Address1 = <<"/exchange/", XName/binary>>, + Address2 = <<"/exchange/", XName/binary, "/key/some-key", XName/binary>>, + + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + + {ok, Session1} = amqp10_client:begin_session_sync(Connection), + {ok, _} = amqp10_client:attach_sender_link(Session1, <<"test-sender">>, Address1), + ExpectedErr = error_unauthorized( + <<"write access to exchange '", XName/binary, + "' in vhost 'test vhost' refused for user 'test user'">>), + receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + {ok, Session2} = amqp10_client:begin_session_sync(Connection), + {ok, _} = amqp10_client:attach_sender_link(Session2, <<"test-sender">>, Address2), + receive {amqp10_event, {session, Session2, {ended, ExpectedErr}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = amqp10_client:close_connection(Connection). + +attach_target_topic_exchange(Config) -> + TargetAddress = <<"/exchange/amq.topic/key/test vhost.test user.a.b">>, + ok = send_to_topic(TargetAddress, Config). + +attach_target_queue(Config) -> + {Conn, Session, LinkPair} = init_pair(Config), + QName = <<"๐Ÿฟ"/utf8>>, + Address = <<"/queue/", QName/binary>>, + + %% missing write permission to default exchange + ok = set_permissions(Config, QName, <<>>, <<>>), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + + {ok, _} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ExpectedErr = error_unauthorized( + <<"write access to exchange 'amq.default' ", + "in vhost 'test vhost' refused for user 'test user'">>), + receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + ok = amqp10_client:close_connection(Conn). + +target_per_message_exchange(Config) -> + TargetAddress = null, + To1 = <<"/exchange/amq.fanout">>, + To2 = <<"/queue/q1">>, + %% missing write permission to default exchange + ok = set_permissions(Config, <<>>, <<"amq.fanout">>, <<>>), + + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, TargetAddress), + ok = wait_for_credit(Sender), + + %% We have sufficient authorization, but expect RELEASED since no queue is bound. + Tag1 = <<"dtag 1">>, + Msg1 = amqp10_msg:set_properties(#{to => To1}, amqp10_msg:new(Tag1, <<"m1">>)), + ok = amqp10_client:send_msg(Sender, Msg1), + receive {amqp10_disposition, {released, Tag1}} -> ok + after 5000 -> ct:fail(released_timeout) + end, + + %% We don't have sufficient authorization. + Tag2 = <<"dtag 2">>, + Msg2 = amqp10_msg:set_properties(#{to => To2}, amqp10_msg:new(Tag2, <<"m2">>)), + ok = amqp10_client:send_msg(Sender, Msg2), + ExpectedErr = error_unauthorized( + <<"write access to exchange 'amq.default' in " + "vhost 'test vhost' refused for user 'test user'">>), + receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = close_connection_sync(Connection). + +target_per_message_internal_exchange(Config) -> + XName = <<"my internal exchange">>, + XProps = #{internal => true}, + TargetAddress = null, + To = <<"/exchange/", XName/binary>>, + + ok = set_permissions(Config, XName, XName, <<>>), + {Conn1, Session1, LinkPair1} = init_pair(Config), + ok = rabbitmq_amqp_client:declare_exchange(LinkPair1, XName, XProps), + {ok, Sender} = amqp10_client:attach_sender_link_sync(Session1, <<"sender">>, TargetAddress), + ok = wait_for_credit(Sender), + + Tag = <<"tag">>, + Msg = amqp10_msg:set_properties(#{to => To}, amqp10_msg:new(Tag, <<"msg">>, true)), + ok = amqp10_client:send_msg(Sender, Msg), + ExpectedErr = error_unauthorized( + <<"forbidden to publish to internal exchange '", XName/binary, "' in vhost 'test vhost'">>), + receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok + after 5000 -> flush(aaa), + ct:fail({missing_event, ?LINE}) + end, + ok = close_connection_sync(Conn1), + + Init = {_, _, LinkPair2} = init_pair(Config), + ok = rabbitmq_amqp_client:delete_exchange(LinkPair2, XName), + ok = cleanup_pair(Init). + +target_per_message_topic(Config) -> + TargetAddress = null, + To1 = <<"/exchange/amq.topic/key/.a">>, + To2 = <<"/exchange/amq.topic/key/.a.b">>, + User = ?config(test_user, Config), + Vhost = ?config(test_vhost, Config), + ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, Vhost), + ok = set_topic_permissions(Config, <<"amq.topic">>, <<"^\.a$">>, <<"^$">>), + + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, TargetAddress), + ok = wait_for_credit(Sender), + + %% We have sufficient authorization, but expect RELEASED since no queue is bound. + Tag1 = <<"dtag 1">>, + Msg1 = amqp10_msg:set_properties(#{to => To1}, amqp10_msg:new(Tag1, <<"m1">>)), + ok = amqp10_client:send_msg(Sender, Msg1), + receive {amqp10_disposition, {released, Tag1}} -> ok + after 5000 -> ct:fail(released_timeout) + end, + + %% We don't have sufficient authorization. + Tag2 = <<"dtag 2">>, + Msg2 = amqp10_msg:set_properties(#{to => To2}, amqp10_msg:new(Tag2, <<"m2">>)), + ok = amqp10_client:send_msg(Sender, Msg2), + ExpectedErr = error_unauthorized( + <<"write access to topic '.a.b' in exchange 'amq.topic' in " + "vhost 'test vhost' refused for user 'test user'">>), + receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = close_connection_sync(Connection). + authn_failure_event(Config) -> ok = event_recorder:start(Config), @@ -520,7 +713,7 @@ user_connection_limit(Config) -> ok = close_connection_sync(C2), ok = rabbit_ct_broker_helpers:clear_user_limits(Config, DefaultUser, Limit). -vhost_queue_limit(Config) -> +v1_vhost_queue_limit(Config) -> Vhost = proplists:get_value(test_vhost, Config), ok = rabbit_ct_broker_helpers:set_vhost_limit(Config, 0, Vhost, max_queues, 0), QName = <<"q1">>, @@ -566,21 +759,27 @@ declare_exchange(Config) -> ok = close_connection_sync(Conn). delete_exchange(Config) -> - {Conn1, _, LinkPair1} = init_pair(Config), + {Conn, Session1, LinkPair1} = init_pair(Config), XName = <<"๐Ÿ“ฎ"/utf8>>, ok = set_permissions(Config, XName, <<>>, <<>>), ok = rabbitmq_amqp_client:declare_exchange(LinkPair1, XName, #{}), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair1), + ok = amqp10_client:end_session(Session1), + ok = clear_permissions(Config), + + {ok, Session2} = amqp10_client:begin_session_sync(Conn), + {ok, LinkPair2} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session2, <<"pair 2">>), ExpectedErr = error_unauthorized( <<"configure access to exchange '", XName/binary, "' in vhost 'test vhost' refused for user 'test user'">>), ?assertEqual({error, {session_ended, ExpectedErr}}, - rabbitmq_amqp_client:delete_exchange(LinkPair1, XName)), - ok = close_connection_sync(Conn1), + rabbitmq_amqp_client:delete_exchange(LinkPair2, XName)), + ok = close_connection_sync(Conn), ok = set_permissions(Config, XName, <<>>, <<>>), - Init = {_, _, LinkPair2} = init_pair(Config), - ok = rabbitmq_amqp_client:delete_exchange(LinkPair2, XName), + Init = {_, _, LinkPair3} = init_pair(Config), + ok = rabbitmq_amqp_client:delete_exchange(LinkPair3, XName), ok = cleanup_pair(Init). declare_queue(Config) -> @@ -640,16 +839,22 @@ declare_queue_vhost_queue_limit(Config) -> ok = rabbit_ct_broker_helpers:clear_vhost_limit(Config, 0, Vhost). delete_queue(Config) -> - {Conn, _, LinkPair} = init_pair(Config), + {Conn, Session1, LinkPair1} = init_pair(Config), QName = <<"๐Ÿฟ"/utf8>>, ok = set_permissions(Config, QName, <<>>, <<>>), - {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair1, QName, #{}), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair1), + ok = amqp10_client:end_session(Session1), + ok = clear_permissions(Config), + + {ok, Session2} = amqp10_client:begin_session_sync(Conn), + {ok, LinkPair2} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session2, <<"pair 2">>), ExpectedErr = error_unauthorized( <<"configure access to queue '", QName/binary, "' in vhost 'test vhost' refused for user 'test user'">>), ?assertEqual({error, {session_ended, ExpectedErr}}, - rabbitmq_amqp_client:delete_queue(LinkPair, QName)), + rabbitmq_amqp_client:delete_queue(LinkPair2, QName)), ok = close_connection_sync(Conn). purge_queue(Config) -> @@ -743,37 +948,47 @@ bind_to_topic_exchange(Config) -> ok = close_connection_sync(Conn). unbind_queue_source(Config) -> - {Conn, _, LinkPair} = init_pair(Config), + {Conn, Session1, LinkPair1} = init_pair(Config), QName = BindingKey = atom_to_binary(?FUNCTION_NAME), XName = <<"amq.direct">>, ok = set_permissions(Config, QName, QName, XName), - {ok, #{}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), - ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, XName, BindingKey, #{}), + {ok, #{}} = rabbitmq_amqp_client:declare_queue(LinkPair1, QName, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair1, QName, XName, BindingKey, #{}), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair1), + ok = amqp10_client:end_session(Session1), %% remove read permission to source exchange ok = set_permissions(Config, QName, QName, <<"^$">>), + + {ok, Session2} = amqp10_client:begin_session_sync(Conn), + {ok, LinkPair2} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session2, <<"pair 2">>), ExpectedErr = error_unauthorized( <<"read access to exchange '", XName/binary, "' in vhost 'test vhost' refused for user 'test user'">>), ?assertEqual({error, {session_ended, ExpectedErr}}, - rabbitmq_amqp_client:unbind_queue(LinkPair, QName, XName, BindingKey, #{})), + rabbitmq_amqp_client:unbind_queue(LinkPair2, QName, XName, BindingKey, #{})), ok = close_connection_sync(Conn). unbind_queue_target(Config) -> - {Conn, _, LinkPair} = init_pair(Config), + {Conn, Session1, LinkPair1} = init_pair(Config), QName = BindingKey = atom_to_binary(?FUNCTION_NAME), XName = <<"amq.direct">>, ok = set_permissions(Config, QName, QName, XName), - {ok, #{}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), - ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, XName, BindingKey, #{}), + {ok, #{}} = rabbitmq_amqp_client:declare_queue(LinkPair1, QName, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair1, QName, XName, BindingKey, #{}), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair1), + ok = amqp10_client:end_session(Session1), %% remove write permission to destination queue ok = set_permissions(Config, QName, <<"^$">>, XName), + + {ok, Session2} = amqp10_client:begin_session_sync(Conn), + {ok, LinkPair2} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session2, <<"pair 2">>), ExpectedErr = error_unauthorized( <<"write access to queue '", QName/binary, "' in vhost 'test vhost' refused for user 'test user'">>), ?assertEqual({error, {session_ended, ExpectedErr}}, - rabbitmq_amqp_client:unbind_queue(LinkPair, QName, XName, BindingKey, #{})), + rabbitmq_amqp_client:unbind_queue(LinkPair2, QName, XName, BindingKey, #{})), ok = close_connection_sync(Conn). unbind_from_topic_exchange(Config) -> diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 2c6add8f6892..6cc17e22c3f5 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -918,8 +918,9 @@ server_closes_link_exchange(Config) -> ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag2, <<"m2">>, false)), ok = wait_for_settlement(DTag2, released), %% 2. that the server closes the link, i.e. sends us a DETACH frame. - ExpectedError = #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_RESOURCE_DELETED}, - receive {amqp10_event, {link, Sender, {detached, ExpectedError}}} -> ok + receive {amqp10_event, + {link, Sender, + {detached, #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_NOT_FOUND}}}} -> ok after 5000 -> ct:fail("server did not close our outgoing link") end, ?assertMatch(#{publishers := 0}, get_global_counters(Config)), diff --git a/deps/rabbit/test/per_node_limit_SUITE.erl b/deps/rabbit/test/per_node_limit_SUITE.erl index 00a51b415373..98990c8dc364 100644 --- a/deps/rabbit/test/per_node_limit_SUITE.erl +++ b/deps/rabbit/test/per_node_limit_SUITE.erl @@ -156,7 +156,7 @@ channel_consumers_limit(Config) -> ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost), Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost), {ok, Ch} = open_channel(Conn1), - Q = <<"Q">>, Tag = <<"Tag">>, + Q = <<"Q">>, {ok, _} = consume(Ch, Q, <<"Tag1">>), {ok, _} = consume(Ch, Q, <<"Tag2">>), diff --git a/deps/rabbit_common/src/rabbit_routing_parser.erl b/deps/rabbit_common/src/rabbit_routing_parser.erl index fa92d3943bcf..81b26d4a913b 100644 --- a/deps/rabbit_common/src/rabbit_routing_parser.erl +++ b/deps/rabbit_common/src/rabbit_routing_parser.erl @@ -16,11 +16,12 @@ parse_endpoint(Destination) -> parse_endpoint(undefined, AllowAnonymousQueue) -> parse_endpoint("/queue", AllowAnonymousQueue); - -parse_endpoint(Destination, AllowAnonymousQueue) when is_binary(Destination) -> - parse_endpoint(unicode:characters_to_list(Destination), - AllowAnonymousQueue); -parse_endpoint(Destination, AllowAnonymousQueue) when is_list(Destination) -> +parse_endpoint(Destination, AllowAnonymousQueue) + when is_binary(Destination) -> + List = unicode:characters_to_list(Destination), + parse_endpoint(List, AllowAnonymousQueue); +parse_endpoint(Destination, AllowAnonymousQueue) + when is_list(Destination) -> case re:split(Destination, "/", [unicode, {return, list}]) of [Name] -> {ok, {queue, unescape(Name)}}; diff --git a/deps/rabbitmq_amqp_client/test/management_SUITE.erl b/deps/rabbitmq_amqp_client/test/management_SUITE.erl index ca387a53150e..80ce4e2f74e9 100644 --- a/deps/rabbitmq_amqp_client/test/management_SUITE.erl +++ b/deps/rabbitmq_amqp_client/test/management_SUITE.erl @@ -248,8 +248,9 @@ all_management_operations(Config) -> ok = amqp10_client:send_msg(Sender3, amqp10_msg:new(DTag7, <<"not routed">>, false)), ok = wait_for_settlement(DTag7, released), %% 2. that the server closes the link, i.e. sends us a DETACH frame. - ExpectedError = #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_RESOURCE_DELETED}, - receive {amqp10_event, {link, Sender3, {detached, ExpectedError}}} -> ok + receive {amqp10_event, + {link, Sender3, + {detached, #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_NOT_FOUND}}}} -> ok after 5000 -> ct:fail({missing_event, ?LINE}) end, diff --git a/deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl b/deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl index 3d8a3d5f6e7a..047982df009c 100644 --- a/deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl +++ b/deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl @@ -66,14 +66,8 @@ log_environment() -> Vars = lists:sort(fun(A, B) -> A =< B end, os:getenv()), - case file:native_name_encoding() of - latin1 -> - ct:pal(?LOW_IMPORTANCE, "Environment variables:~n~ts", - [[io_lib:format(" ~ts~n", [V]) || V <- Vars]]); - utf8 -> - ct:pal(?LOW_IMPORTANCE, "Environment variables:~n~ts", - [[io_lib:format(" ~ts~n", [V]) || V <- Vars]]) - end. + ct:pal(?LOW_IMPORTANCE, "Environment variables:~n~ts", + [[io_lib:format(" ~ts~n", [V]) || V <- Vars]]). run_setup_steps(Config) -> run_setup_steps(Config, []). diff --git a/deps/rabbitmq_mqtt/BUILD.bazel b/deps/rabbitmq_mqtt/BUILD.bazel index 332753152c01..94df1539d338 100644 --- a/deps/rabbitmq_mqtt/BUILD.bazel +++ b/deps/rabbitmq_mqtt/BUILD.bazel @@ -279,7 +279,7 @@ rabbitmq_integration_suite( ":test_util_beam", ], runtime_deps = [ - "//deps/amqp10_client:erlang_app", + "//deps/rabbitmq_amqp_client:erlang_app", "//deps/rabbitmq_stomp:erlang_app", "//deps/rabbitmq_stream_common:erlang_app", "@emqtt//:erlang_app", diff --git a/deps/rabbitmq_mqtt/Makefile b/deps/rabbitmq_mqtt/Makefile index c6db3954d6d3..cc4ea6b89ef8 100644 --- a/deps/rabbitmq_mqtt/Makefile +++ b/deps/rabbitmq_mqtt/Makefile @@ -45,7 +45,7 @@ export BUILD_WITHOUT_QUIC LOCAL_DEPS = ssl DEPS = ranch rabbit_common rabbit ra amqp10_common -TEST_DEPS = emqtt ct_helper rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management rabbitmq_web_mqtt amqp_client rabbitmq_consistent_hash_exchange amqp10_client rabbitmq_stomp rabbitmq_stream +TEST_DEPS = emqtt ct_helper rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management rabbitmq_web_mqtt amqp_client rabbitmq_consistent_hash_exchange rabbitmq_amqp_client rabbitmq_stomp rabbitmq_stream dep_ct_helper = git https://github.com/extend/ct_helper.git master dep_emqtt = git https://github.com/rabbitmq/emqtt.git master diff --git a/deps/rabbitmq_mqtt/src/mc_mqtt.erl b/deps/rabbitmq_mqtt/src/mc_mqtt.erl index a5663f64bccf..9add4830cdad 100644 --- a/deps/rabbitmq_mqtt/src/mc_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/mc_mqtt.erl @@ -95,7 +95,8 @@ convert_from(mc_amqp, Sections, Env) -> MqttX = maps:get(mqtt_x, Env, ?DEFAULT_MQTT_EXCHANGE), case Address of <<"/exchange/", - MqttX:(byte_size(MqttX))/binary, "/", + MqttX:(byte_size(MqttX))/binary, + "/key/", RoutingKey/binary>> -> MqttTopic = rabbit_mqtt_util:amqp_to_mqtt(RoutingKey), Props0#{'Response-Topic' => MqttTopic}; @@ -271,7 +272,7 @@ convert_to(mc_amqp, #mqtt_msg{qos = Qos, #{'Response-Topic' := MqttTopic} -> Exchange = maps:get(mqtt_x, Env, ?DEFAULT_MQTT_EXCHANGE), Topic = rabbit_mqtt_util:mqtt_to_amqp(MqttTopic), - Address = <<"/exchange/", Exchange/binary, "/", Topic/binary>>, + Address = <<"/exchange/", Exchange/binary, "/key/", Topic/binary>>, {utf8, Address}; _ -> undefined diff --git a/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl b/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl index c57e3e632497..ddf2f2a2e919 100644 --- a/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl @@ -273,12 +273,12 @@ amqp_to_mqtt_reply_to(_Config) -> Val = amqp_value({utf8, <<"hey">>}), Key = mqtt_x, Env = #{Key => <<"mqtt-topic-exchange">>}, - AmqpProps1 = #'v1_0.properties'{reply_to = {utf8, <<"/exchange/mqtt-topic-exchange/my.routing.key">>}}, + AmqpProps1 = #'v1_0.properties'{reply_to = {utf8, <<"/exchange/mqtt-topic-exchange/key/my.routing.key">>}}, #mqtt_msg{props = Props1} = amqp_to_mqtt([AmqpProps1, Val], Env), ?assertEqual({ok, <<"my/routing/key">>}, maps:find('Response-Topic', Props1)), - AmqpProps2 = #'v1_0.properties'{reply_to = {utf8, <<"/exchange/NON-mqtt-topic-exchange/my.routing.key">>}}, + AmqpProps2 = #'v1_0.properties'{reply_to = {utf8, <<"/exchange/NON-mqtt-topic-exchange/key/my.routing.key">>}}, #mqtt_msg{props = Props2} = amqp_to_mqtt([AmqpProps2, Val]), ?assertEqual(error, maps:find('Response-Topic', Props2)), diff --git a/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl b/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl index 230c3cc215d3..44d466ac41a2 100644 --- a/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl @@ -158,10 +158,14 @@ amqp(Config) -> sasl => {plain, <<"guest">>, <<"guest">>}}, {ok, Connection1} = amqp10_client:open_connection(OpnConf), {ok, Session1} = amqp10_client:begin_session(Connection1), - ReceiverLinkName = <<"test-receiver">>, + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session1, <<"pair">>), + QName = <<"queue for AMQP 1.0 client">>, + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, <<"amq.topic">>, <<"topic.1">>, #{}), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), {ok, Receiver} = amqp10_client:attach_receiver_link( - Session1, ReceiverLinkName, <<"/topic/topic.1">>, unsettled, - configuration), + Session1, <<"test-receiver">>, <<"/queue/", QName/binary>>, + unsettled, configuration), %% MQTT 5.0 to AMQP 1.0 C = connect(ClientId, Config), @@ -208,7 +212,7 @@ amqp(Config) -> #{correlation_id := Correlation, content_type := ContentType, reply_to := ReplyToAddress} = amqp10_msg:properties(Msg1), - ?assertEqual(<<"/exchange/amq.topic/response.topic">>, ReplyToAddress), + ?assertEqual(<<"/exchange/amq.topic/key/response.topic">>, ReplyToAddress), %% Thanks to the 'Payload-Format-Indicator', we get a single utf8 value. ?assertEqual(#'v1_0.amqp_value'{content = {utf8, RequestPayload}}, amqp10_msg:body(Msg1)), @@ -449,9 +453,9 @@ stream(Config) -> amqp10_msg:message_annotations(Msg)), ?assertEqual(#{correlation_id => Correlation, content_type => ContentType, - %% We expect that reply_to contains a valid address, + %% We expect that reply_to contains a valid AMQP 1.0 address, %% and that the topic format got translated from MQTT to AMQP 0.9.1. - reply_to => <<"/exchange/amq.topic/response.topic">>}, + reply_to => <<"/exchange/amq.topic/key/response.topic">>}, amqp10_msg:properties(Msg)), ?assertEqual(#{<<"rabbit๐Ÿ‡"/utf8>> => <<"carrot๐Ÿฅ•"/utf8>>, <<"key">> => <<"val">>}, diff --git a/deps/rabbitmq_shovel_management/test/http_SUITE.erl b/deps/rabbitmq_shovel_management/test/http_SUITE.erl index 1f80fd319b44..b2275db11281 100644 --- a/deps/rabbitmq_shovel_management/test/http_SUITE.erl +++ b/deps/rabbitmq_shovel_management/test/http_SUITE.erl @@ -15,16 +15,16 @@ all() -> [ - {group, non_parallel_tests} + {group, tests} ]. groups() -> [ - {non_parallel_tests, [], [ - amqp10_shovels, - shovels, - dynamic_plugin_enable_disable - ]} + {tests, [], [ + amqp10_shovels, + shovels, + dynamic_plugin_enable_disable + ]} ]. %% ------------------------------------------------------------------- diff --git a/deps/rabbitmq_stomp/test/proxy_protocol_SUITE.erl b/deps/rabbitmq_stomp/test/proxy_protocol_SUITE.erl index 8f734e0124dd..8e72442355a4 100644 --- a/deps/rabbitmq_stomp/test/proxy_protocol_SUITE.erl +++ b/deps/rabbitmq_stomp/test/proxy_protocol_SUITE.erl @@ -35,9 +35,9 @@ init_per_suite(Config) -> {rmq_certspwd, "bunnychow"}, {rabbitmq_ct_tls_verify, verify_none} ]), - MqttConfig = stomp_config(), + StompConfig = stomp_config(), rabbit_ct_helpers:run_setup_steps(Config1, - [ fun(Conf) -> merge_app_env(MqttConfig, Conf) end ] ++ + [ fun(Conf) -> merge_app_env(StompConfig, Conf) end ] ++ rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()). @@ -111,8 +111,8 @@ connection_name() -> {_, Name} = lists:keyfind(name, 1, Values), Name. -merge_app_env(MqttConfig, Config) -> - rabbit_ct_helpers:merge_app_env(Config, MqttConfig). +merge_app_env(StompConfig, Config) -> + rabbit_ct_helpers:merge_app_env(Config, StompConfig). stomp_connect_frame() -> <<"CONNECT\n", diff --git a/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl b/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl index 6a5dd4151d5a..872424f53224 100644 --- a/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl +++ b/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl @@ -120,7 +120,7 @@ amqp_credit_single_grant(Config) -> OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Session} = amqp10_client:begin_session_sync(Connection), - Address = <<"/amq/queue/", Stream/binary>>, + Address = <<"/queue/", Stream/binary>>, {ok, Receiver} = amqp10_client:attach_receiver_link( Session, <<"test-receiver">>, Address, settled, configuration, #{<<"rabbitmq:stream-offset-spec">> => <<"first">>}), @@ -141,7 +141,7 @@ amqp_credit_multiple_grants(Config) -> OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Session} = amqp10_client:begin_session_sync(Connection), - Address = <<"/amq/queue/", Stream/binary>>, + Address = <<"/queue/", Stream/binary>>, {ok, Receiver} = amqp10_client:attach_receiver_link( Session, <<"test-receiver">>, Address, unsettled, configuration, #{<<"rabbitmq:stream-offset-spec">> => <<"first">>}), @@ -246,7 +246,7 @@ amqp_attach_sub_batch(Config) -> OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Session} = amqp10_client:begin_session_sync(Connection), - Address = <<"/amq/queue/", Stream/binary>>, + Address = <<"/queue/", Stream/binary>>, {ok, Receiver} = amqp10_client:attach_receiver_link( Session, <<"test-receiver">>, Address, settled, configuration, %% Attach in the middle of an uncompresssed sub batch.