Skip to content

Commit

Permalink
Add exchange deletion checks
Browse files Browse the repository at this point in the history
  • Loading branch information
ansd committed Mar 12, 2024
1 parent ad8595f commit 7c0240f
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 101 deletions.
2 changes: 1 addition & 1 deletion deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1279,7 +1279,7 @@ rabbitmq_integration_suite(
":test_event_recorder_beam",
],
runtime_deps = [
"//deps/amqp10_client:erlang_app",
"//deps/rabbitmq_amqp_client:erlang_app",
],
)

Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/src/rabbit_access_control.erl
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ check_resource_access(User = #user{username = Username,
check_access(
fun() -> Module:check_resource_access(
auth_user(User, Impl), Resource, Permission, Context) end,
Module, "~s access to ~s refused for user '~s'",
Module, "~s access to ~ts refused for user '~ts'",
[Permission, rabbit_misc:rs(Resource), Username]);
(_, Else) -> Else
end, ok, Modules).
Expand All @@ -202,7 +202,7 @@ check_topic_access(User = #user{username = Username,
check_access(
fun() -> Module:check_topic_access(
auth_user(User, Impl), Resource, Permission, Context) end,
Module, "~s access to topic '~s' in exchange ~s refused for user '~s'",
Module, "~s access to topic '~ts' in exchange ~s refused for user '~ts'",
[Permission, maps:get(routing_key, Context), rabbit_misc:rs(Resource), Username]);
(_, Else) -> Else
end, ok, Modules).
Expand Down
28 changes: 12 additions & 16 deletions deps/rabbit/src/rabbit_amqp_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -148,22 +148,19 @@ handle_http_req(<<"DELETE">>,
{<<"200">>, [], RespPayload};

handle_http_req(<<"DELETE">>,
[<<"exchanges">>, XNameBinQ],
[<<"exchanges">>, XNameBinQuoted],
_Query,
null,
Vhost,
#user{username = Username},
User = #user{username = Username},
_ConnPid) ->
XNameBin = uri_string:unquote(XNameBinQ),
XNameBin = uri_string:unquote(XNameBinQuoted),
XName = rabbit_misc:r(Vhost, exchange, XNameBin),
ok = case rabbit_exchange:delete(XName, false, Username) of
ok ->
ok;
{error, not_found} ->
ok
%% %% TODO return deletion failure
%% {error, in_use} ->
end,
ok = prohibit_cr_lf(XNameBin),
ok = prohibit_default_exchange(XNameBin),
ok = prohibit_reserved_amq(XName),
ok = check_resource_access(XName, configure, User),
_ = rabbit_exchange:delete(XName, false, Username),
{<<"204">>, [], null};

handle_http_req(<<"POST">>,
Expand Down Expand Up @@ -457,13 +454,12 @@ prohibit_reserved_amq(#resource{}) ->
rabbit_types:user()) -> ok.
check_resource_access(Resource, Perm, User) ->
try rabbit_access_control:check_resource_access(User, Resource, Perm, #{})
catch exit:#amqp_error{name = not_allowed} ->
catch exit:#amqp_error{name = access_refused,
explanation = Explanation} ->
%% For authorization failures, let's be more strict: Close the entire
%% AMQP session instead of only returning a HTTP Status Code 403.
%% AMQP session instead of only returning an HTTP Status Code 403.
rabbit_amqp_util:protocol_error(
?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
"~s access refused for user '~ts' to ~ts",
[Perm, User, rabbit_misc:rs(Resource)])
?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, Explanation, [])
end.

-spec throw(binary(), io:format(), [term()]) -> no_return().
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_amqp_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
-spec protocol_error(term(), io:format(), [term()]) ->
no_return().
protocol_error(Condition, Msg, Args) ->
Description = list_to_binary(lists:flatten(io_lib:format(Msg, Args))),
Description = unicode:characters_to_binary(lists:flatten(io_lib:format(Msg, Args))),
Reason = #'v1_0.error'{condition = Condition,
description = {utf8, Description}},
exit(Reason).
46 changes: 45 additions & 1 deletion deps/rabbit/test/amqp_auth_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ groups() ->
vhost_absent,
vhost_connection_limit,
user_connection_limit,
vhost_queue_limit
vhost_queue_limit,

%% AMQP Management operations against HTTP API v2
declare_exchange,
delete_exchange
]
}
].
Expand Down Expand Up @@ -537,6 +541,46 @@ vhost_queue_limit(Config) ->
ok = close_connection_sync(C2),
ok = rabbit_ct_broker_helpers:clear_vhost_limit(Config, 0, Vhost).

declare_exchange(Config) ->
{Conn, _Session, LinkPair} = init_pair(Config),
XName = <<"📮"/utf8>>,
ExpectedErr = error_unauthorized(
<<"configure access to exchange '", XName/binary,
"' in vhost 'test vhost' refused for user 'test user'">>),
?assertEqual({error, {session_ended, ExpectedErr}},
rabbitmq_amqp_client:declare_exchange(LinkPair, XName, #{})),
ok = close_connection_sync(Conn).

delete_exchange(Config) ->
{Conn1, _, LinkPair1} = init_pair(Config),
XName = <<"📮"/utf8>>,
ok = set_permissions(Config, XName, <<>>, <<>>),
ok = rabbitmq_amqp_client:declare_exchange(LinkPair1, XName, #{}),
ok = clear_permissions(Config),
ExpectedErr = error_unauthorized(
<<"configure access to exchange '", XName/binary,
"' in vhost 'test vhost' refused for user 'test user'">>),
?assertEqual({error, {session_ended, ExpectedErr}},
rabbitmq_amqp_client:delete_exchange(LinkPair1, XName)),
ok = close_connection_sync(Conn1),

ok = set_permissions(Config, XName, <<>>, <<>>),
Init = {_, _, LinkPair2} = init_pair(Config),
ok = rabbitmq_amqp_client:delete_exchange(LinkPair2, XName),
ok = cleanup_pair(Init).

init_pair(Config) ->
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"mgmt link pair">>),
{Connection, Session, LinkPair}.

cleanup_pair({Connection, Session, LinkPair}) ->
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection).

connection_config(Config) ->
Vhost = ?config(test_vhost, Config),
connection_config(Config, Vhost).
Expand Down
7 changes: 4 additions & 3 deletions deps/rabbitmq_amqp_client/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ def all_srcs(name = "all_srcs"):
srcs = ["src/rabbitmq_amqp_client.erl"],
)
filegroup(name = "private_hdrs")
filegroup(name = "public_hdrs", srcs = [
"include/rabbitmq_amqp_client.hrl",
])
filegroup(
name = "public_hdrs",
srcs = ["include/rabbitmq_amqp_client.hrl"],
)
filegroup(name = "priv")
filegroup(
name = "license_files",
Expand Down
3 changes: 2 additions & 1 deletion deps/rabbitmq_amqp_client/include/rabbitmq_amqp_client.hrl
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
-record(link_pair, {outgoing_link :: amqp10_client:link_ref(),
-record(link_pair, {session :: pid(),
outgoing_link :: amqp10_client:link_ref(),
incoming_link :: amqp10_client:link_ref()}).
-type link_pair() :: #link_pair{}.
96 changes: 48 additions & 48 deletions deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

-export[attach_management_link_pair_sync/2,
detach_management_link_pair_sync/1,
declare_queue/2,
declare_exchange/2,
declare_queue/3,
declare_exchange/3,
bind_queue/5,
bind_exchange/5,
unbind_queue/5,
Expand Down Expand Up @@ -61,7 +61,8 @@ attach_management_link_pair_sync(Session, Name) ->
{ok, IncomingRef} ?= attach(Session, IncomingAttachArgs),
ok ?= await_attached(OutgoingRef),
ok ?= await_attached(IncomingRef),
{ok, #link_pair{outgoing_link = OutgoingRef,
{ok, #link_pair{session = Session,
outgoing_link = OutgoingRef,
incoming_link = IncomingRef}}
end.

Expand Down Expand Up @@ -117,28 +118,26 @@ await_detached(Ref) ->
{error, timeout}
end.

-spec declare_queue(link_pair(), queue_properties()) ->
-spec declare_queue(link_pair(), binary(), queue_properties()) ->
{ok, map()} | {error, term()}.
declare_queue(LinkPair, QueueProperties) ->
{QName, Body0} = maps:fold(
fun(name, V, {undefined, L}) when is_binary(V) ->
{V, L};
(durable, V, {N, L}) when is_boolean(V) ->
{N, [{{utf8, <<"durable">>}, {boolean, V}} | L]};
(exclusive, V, {N, L}) when is_boolean(V) ->
{N, [{{utf8, <<"exclusive">>}, {boolean, V}} | L]};
(auto_delete, V, {N, L}) when is_boolean(V) ->
{N, [{{utf8, <<"auto_delete">>}, {boolean, V}} | L]};
(arguments, V, {N, L0}) ->
KVList = maps:fold(
fun(K = <<"x-", _/binary>>, TaggedVal = {T, _}, L)
when is_atom(T) ->
[{{utf8, K}, TaggedVal} | L]
end, [], V),
{N, [{{utf8, <<"arguments">>}, {map, KVList}} | L0]}
end, {undefined, []}, QueueProperties),
declare_queue(LinkPair, QueueName, QueueProperties) ->
Body0 = maps:fold(
fun(durable, V, L) when is_boolean(V) ->
[{{utf8, <<"durable">>}, {boolean, V}} | L];
(exclusive, V, L) when is_boolean(V) ->
[{{utf8, <<"exclusive">>}, {boolean, V}} | L];
(auto_delete, V, L) when is_boolean(V) ->
[{{utf8, <<"auto_delete">>}, {boolean, V}} | L];
(arguments, V, L) ->
KVList = maps:fold(
fun(K = <<"x-", _/binary>>, TaggedVal = {T, _}, L0)
when is_atom(T) ->
[{{utf8, K}, TaggedVal} | L0]
end, [], V),
[{{utf8, <<"arguments">>}, {map, KVList}} | L]
end, [], QueueProperties),
Body = {map, Body0},
QNameQuoted = uri_string:quote(QName),
QNameQuoted = uri_string:quote(QueueName),
Props = #{subject => <<"PUT">>,
to => <<"/queues/", QNameQuoted/binary>>},

Expand Down Expand Up @@ -316,31 +315,29 @@ purge_or_delete_queue(LinkPair, QueueName, PathSuffix) ->
Err
end.

-spec declare_exchange(link_pair(), exchange_properties()) ->
-spec declare_exchange(link_pair(), binary(), exchange_properties()) ->
ok | {error, term()}.
declare_exchange(LinkPair, ExchangeProperties) ->
{XName, Body0} = maps:fold(
fun(name, V, {undefined, L}) when is_binary(V) ->
{V, L};
(type, V, {N, L}) when is_binary(V) ->
{N, [{{utf8, <<"type">>}, {utf8, V}} | L]};
(durable, V, {N, L}) when is_boolean(V) ->
{N, [{{utf8, <<"durable">>}, {boolean, V}} | L]};
(auto_delete, V, {N, L}) when is_boolean(V) ->
{N, [{{utf8, <<"auto_delete">>}, {boolean, V}} | L]};
(internal, V, {N, L}) when is_boolean(V) ->
{N, [{{utf8, <<"internal">>}, {boolean, V}} | L]};
(arguments, V, {N, L0}) ->
KVList = maps:fold(
fun(K = <<"x-", _/binary>>, TaggedVal = {T, _}, L)
when is_atom(T) ->
[{{utf8, K}, TaggedVal} | L]
end, [], V),
{N, [{{utf8, <<"arguments">>}, {map, KVList}} | L0]}
end, {undefined, []}, ExchangeProperties),
declare_exchange(LinkPair, ExchangeName, ExchangeProperties) ->
Body0 = maps:fold(
fun(type, V, L) when is_binary(V) ->
[{{utf8, <<"type">>}, {utf8, V}} | L];
(durable, V, L) when is_boolean(V) ->
[{{utf8, <<"durable">>}, {boolean, V}} | L];
(auto_delete, V, L) when is_boolean(V) ->
[{{utf8, <<"auto_delete">>}, {boolean, V}} | L];
(internal, V, L) when is_boolean(V) ->
[{{utf8, <<"internal">>}, {boolean, V}} | L];
(arguments, V, L) ->
KVList = maps:fold(
fun(K = <<"x-", _/binary>>, TaggedVal = {T, _}, L0)
when is_atom(T) ->
[{{utf8, K}, TaggedVal} | L0]
end, [], V),
[{{utf8, <<"arguments">>}, {map, KVList}} | L]
end, [], ExchangeProperties),
Body = {map, Body0},

XNameQuoted = uri_string:quote(XName),
XNameQuoted = uri_string:quote(ExchangeName),
Props = #{subject => <<"PUT">>,
to => <<"/exchanges/", XNameQuoted/binary>>},

Expand Down Expand Up @@ -377,7 +374,8 @@ delete_exchange(LinkPair, ExchangeName) ->

-spec request(link_pair(), amqp10_msg:amqp10_properties(), amqp10_prim()) ->
{ok, Response :: amqp10_msg:amqp10_msg()} | {error, term()}.
request(#link_pair{outgoing_link = OutgoingLink,
request(#link_pair{session = Session,
outgoing_link = OutgoingLink,
incoming_link = IncomingLink}, Properties, Body) ->
MessageId = message_id(),
Properties1 = Properties#{message_id => {binary, MessageId},
Expand All @@ -389,9 +387,11 @@ request(#link_pair{outgoing_link = OutgoingLink,
ok ->
receive {amqp10_msg, IncomingLink, Response} ->
#{correlation_id := MessageId} = amqp10_msg:properties(Response),
{ok, Response}
{ok, Response};
{amqp10_event, {session, Session, {ended, Reason}}} ->
{error, {session_ended, Reason}}
after ?TIMEOUT ->
{error, response_timeout}
{error, timeout}
end;
Err ->
Err
Expand Down
Loading

0 comments on commit 7c0240f

Please sign in to comment.