Skip to content

Commit

Permalink
WIP AMQP Management
Browse files Browse the repository at this point in the history
  • Loading branch information
ansd committed Feb 20, 2024
1 parent 8ace41c commit a90d26e
Show file tree
Hide file tree
Showing 27 changed files with 1,592 additions and 175 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
!/deps/amqp10_common/
!/deps/oauth2_client/
!/deps/rabbitmq_amqp1_0/
!/deps/rabbitmq_amqp_client/
!/deps/rabbitmq_auth_backend_cache/
!/deps/rabbitmq_auth_backend_http/
!/deps/rabbitmq_auth_backend_ldap/
Expand Down
2 changes: 1 addition & 1 deletion deps/amqp10_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ load(

APP_NAME = "amqp10_client"

APP_DESCRIPTION = "AMQP 1.0 client from the RabbitMQ Project"
APP_DESCRIPTION = "AMQP 1.0 client"

APP_MODULE = "amqp10_client_app"

Expand Down
2 changes: 1 addition & 1 deletion deps/amqp10_client/Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
PROJECT = amqp10_client
PROJECT_DESCRIPTION = AMQP 1.0 client from the RabbitMQ Project
PROJECT_DESCRIPTION = AMQP 1.0 client
PROJECT_MOD = amqp10_client_app

define PROJECT_APP_EXTRA_KEYS
Expand Down
10 changes: 4 additions & 6 deletions deps/amqp10_client/src/amqp10_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
parse_uri/1
]).

-define(DEFAULT_TIMEOUT, 5000).

-type snd_settle_mode() :: amqp10_client_session:snd_settle_mode().
-type rcv_settle_mode() :: amqp10_client_session:rcv_settle_mode().

Expand Down Expand Up @@ -134,7 +132,7 @@ begin_session(Connection) when is_pid(Connection) ->
-spec begin_session_sync(pid()) ->
supervisor:startchild_ret() | session_timeout.
begin_session_sync(Connection) when is_pid(Connection) ->
begin_session_sync(Connection, ?DEFAULT_TIMEOUT).
begin_session_sync(Connection, ?TIMEOUT).

%% @doc Synchronously begins an amqp10 session using 'Connection'.
%% This is a convenience function that awaits the 'begun' event
Expand Down Expand Up @@ -191,7 +189,7 @@ attach_sender_link_sync(Session, Name, Target, SettleMode, Durability) ->
{ok, Ref};
{amqp10_event, {link, Ref, {detached, Err}}} ->
{error, Err}
after ?DEFAULT_TIMEOUT -> link_timeout
after ?TIMEOUT -> link_timeout
end.

%% @doc Attaches a sender link to a target.
Expand Down Expand Up @@ -357,7 +355,7 @@ stop_receiver_link(#link_ref{role = receiver,
send_msg(#link_ref{role = sender, session = Session,
link_handle = Handle}, Msg0) ->
Msg = amqp10_msg:set_handle(Handle, Msg0),
amqp10_client_session:transfer(Session, Msg, ?DEFAULT_TIMEOUT).
amqp10_client_session:transfer(Session, Msg, ?TIMEOUT).

%% @doc Accept a message on a the link referred to be the 'LinkRef'.
-spec accept_msg(link_ref(), amqp10_msg:amqp10_msg()) -> ok.
Expand All @@ -376,7 +374,7 @@ settle_msg(LinkRef, Msg, Settlement) ->
%% Flows a single link credit then awaits delivery or timeout.
-spec get_msg(link_ref()) -> {ok, amqp10_msg:amqp10_msg()} | {error, timeout}.
get_msg(LinkRef) ->
get_msg(LinkRef, ?DEFAULT_TIMEOUT).
get_msg(LinkRef, ?TIMEOUT).

%% @doc Get a single message from a link.
%% Flows a single link credit then awaits delivery or timeout.
Expand Down
74 changes: 43 additions & 31 deletions deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
diff/2]).

-define(MAX_SESSION_WINDOW_SIZE, 65535).
-define(DEFAULT_TIMEOUT, 5000).
-define(UINT_OUTGOING_WINDOW, {uint, ?UINT_MAX}).
-define(INITIAL_OUTGOING_DELIVERY_ID, ?UINT_MAX).
%% "The next-outgoing-id MAY be initialized to an arbitrary value" [2.5.6]
Expand Down Expand Up @@ -149,7 +148,7 @@
reader :: pid(),
socket :: amqp10_client_connection:amqp10_socket() | undefined,
links = #{} :: #{output_handle() => #link{}},
link_index = #{} :: #{link_name() => output_handle()},
link_index = #{} :: #{{link_role(), link_name()} => output_handle()},
link_handle_index = #{} :: #{input_handle() => output_handle()},
next_link_handle = 0 :: output_handle(),
early_attach_requests :: [term()],
Expand All @@ -172,7 +171,7 @@

-spec begin_sync(pid()) -> supervisor:startchild_ret().
begin_sync(Connection) ->
begin_sync(Connection, ?DEFAULT_TIMEOUT).
begin_sync(Connection, ?TIMEOUT).

-spec begin_sync(pid(), non_neg_integer()) ->
supervisor:startchild_ret() | session_timeout.
Expand Down Expand Up @@ -302,33 +301,37 @@ mapped(cast, #'v1_0.end'{error = Err}, State) ->
mapped(cast, #'v1_0.attach'{name = {utf8, Name},
initial_delivery_count = IDC,
handle = {uint, InHandle},
role = PeerRoleBool,
max_message_size = MaybeMaxMessageSize},
#state{links = Links, link_index = LinkIndex,
link_handle_index = LHI} = State0) ->

#{Name := OutHandle} = LinkIndex,
OurRoleBool = not PeerRoleBool,
OurRole = boolean_to_role(OurRoleBool),
LinkIndexKey = {OurRole, Name},
#{LinkIndexKey := OutHandle} = LinkIndex,
#{OutHandle := Link0} = Links,
ok = notify_link_attached(Link0),

{DeliveryCount, MaxMessageSize} =
case Link0 of
#link{role = sender,
#link{role = sender = OurRole,
delivery_count = DC} ->
MSS = case MaybeMaxMessageSize of
{ulong, S} when S > 0 -> S;
_ -> undefined
end,
{DC, MSS};
#link{role = receiver,
#link{role = receiver = OurRole,
max_message_size = MSS} ->
{unpack(IDC), MSS}
end,
Link = Link0#link{state = attached,
input_handle = InHandle,
delivery_count = DeliveryCount,
max_message_size = MaxMessageSize},
State = State0#state{links = Links#{OutHandle => Link},
link_index = maps:remove(Name, LinkIndex),
State = State0#state{links = Links#{OutHandle := Link},
link_index = maps:remove(LinkIndexKey, LinkIndex),
link_handle_index = LHI#{InHandle => OutHandle}},
{keep_state, State};
mapped(cast, #'v1_0.detach'{handle = {uint, InHandle},
Expand Down Expand Up @@ -648,8 +651,8 @@ build_frames(Channel, Trf, Payload, MaxPayloadSize, Acc) ->

make_source(#{role := {sender, _}}) ->
#'v1_0.source'{};
make_source(#{role := {receiver, #{address := Address} = Target, _Pid}, filter := Filter}) ->
Durable = translate_terminus_durability(maps:get(durable, Target, none)),
make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter := Filter}) ->
Durable = translate_terminus_durability(maps:get(durable, Source, none)),
TranslatedFilter = translate_filters(Filter),
#'v1_0.source'{address = {utf8, Address},
durable = {uint, Durable},
Expand Down Expand Up @@ -743,35 +746,34 @@ detach_with_error_cond(Link = #link{output_handle = OutHandle}, State, Cond) ->
ok = send(Detach, State),
Link#link{state = detach_sent}.

send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
#state{next_link_handle = OutHandle0, links = Links,
send_attach(Send, #{name := Name, role := RoleTuple} = Args, {FromPid, _},
#state{next_link_handle = OutHandle0, links = Links,
link_index = LinkIndex} = State) ->

Source = make_source(Args),
Target = make_target(Args),
Properties = amqp10_client_types:make_properties(Args),

{LinkTarget, RoleAsBool, InitialDeliveryCount, MaxMessageSize} =
case Role of
{LinkTarget, InitialDeliveryCount, MaxMessageSize} =
case RoleTuple of
{receiver, _, Pid} ->
{{pid, Pid}, true, undefined, max_message_size(Args)};
{{pid, Pid}, undefined, max_message_size(Args)};
{sender, #{address := TargetAddr}} ->
{TargetAddr, false, uint(?INITIAL_DELIVERY_COUNT), undefined}
end,

{OutHandle, NextLinkHandle} =
case Args of
#{handle := Handle} ->
%% Client app provided link handle.
%% Really only meant for integration tests.
{Handle, OutHandle0};
_ ->
{OutHandle0, OutHandle0 + 1}
{TargetAddr, uint(?INITIAL_DELIVERY_COUNT), undefined}
end,

{OutHandle, NextLinkHandle} = case Args of
#{handle := Handle} ->
%% Client app provided link handle.
%% Really only meant for integration tests.
{Handle, OutHandle0};
_ ->
{OutHandle0, OutHandle0 + 1}
end,
Role = element(1, RoleTuple),
% create attach performative
Attach = #'v1_0.attach'{name = {utf8, Name},
role = RoleAsBool,
role = role_to_boolean(Role),
handle = {uint, OutHandle},
source = Source,
properties = Properties,
Expand All @@ -782,12 +784,12 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
max_message_size = MaxMessageSize},
ok = Send(Attach, State),

LinkRef = make_link_ref(element(1, Role), self(), OutHandle),
Ref = make_link_ref(Role, self(), OutHandle),
Link = #link{name = Name,
ref = LinkRef,
ref = Ref,
output_handle = OutHandle,
state = attach_sent,
role = element(1, Role),
role = Role,
notify = FromPid,
auto_flow = never,
target = LinkTarget,
Expand All @@ -796,7 +798,7 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},

{State#state{links = Links#{OutHandle => Link},
next_link_handle = NextLinkHandle,
link_index = LinkIndex#{Name => OutHandle}}, LinkRef}.
link_index = LinkIndex#{{Role, Name} => OutHandle}}, Ref}.

-spec handle_session_flow(#'v1_0.flow'{}, #state{}) -> #state{}.
handle_session_flow(#'v1_0.flow'{next_incoming_id = MaybeNII,
Expand Down Expand Up @@ -1090,6 +1092,16 @@ sym(B) when is_atom(B) -> {symbol, atom_to_binary(B, utf8)}.
reason(undefined) -> normal;
reason(Other) -> Other.

role_to_boolean(sender) ->
?AMQP_ROLE_SENDER;
role_to_boolean(receiver) ->
?AMQP_ROLE_RECEIVER.

boolean_to_role(?AMQP_ROLE_SENDER) ->
sender;
boolean_to_role(?AMQP_ROLE_RECEIVER) ->
receiver.

format_status(Status = #{data := Data0}) ->
#state{channel = Channel,
remote_channel = RemoteChannel,
Expand Down
15 changes: 11 additions & 4 deletions deps/amqp10_client/src/amqp10_msg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
%%
-module(amqp10_msg).

-include_lib("amqp10_common/include/amqp10_types.hrl").

-export([from_amqp_records/1,
to_amqp_records/1,
% "read" api
Expand Down Expand Up @@ -256,12 +258,12 @@ body_bin(#amqp10_msg{body = #'v1_0.amqp_value'{} = Body}) ->
new(DeliveryTag, Body, Settled) when is_binary(Body) ->
#amqp10_msg{transfer = #'v1_0.transfer'{delivery_tag = {binary, DeliveryTag},
settled = Settled,
message_format = {uint, 0}},
message_format = {uint, ?MESSAGE_FORMAT}},
body = [#'v1_0.data'{content = Body}]};
new(DeliveryTag, Body, Settled) -> % TODO: constrain to amqp types
#amqp10_msg{transfer = #'v1_0.transfer'{delivery_tag = {binary, DeliveryTag},
settled = Settled,
message_format = {uint, 0}},
message_format = {uint, ?MESSAGE_FORMAT}},
body = Body}.

%% @doc Create a new settled amqp10 message using the specified delivery tag
Expand Down Expand Up @@ -322,8 +324,13 @@ set_properties(Props, #amqp10_msg{properties = undefined} = Msg) ->
set_properties(Props, Msg#amqp10_msg{properties = #'v1_0.properties'{}});
set_properties(Props, #amqp10_msg{properties = Current} = Msg) ->
% TODO many fields are `any` types and we need to try to type tag them
P = maps:fold(fun(message_id, V, Acc) when is_binary(V) ->
% message_id can be any type but we restrict it here
P = maps:fold(fun(message_id, {T, _V} = TypeVal, Acc) when T =:= ulong orelse
T =:= uuid orelse
T =:= binary orelse
T =:= uf8 ->
Acc#'v1_0.properties'{message_id = TypeVal};
(message_id, V, Acc) when is_binary(V) ->
%% backward compat clause
Acc#'v1_0.properties'{message_id = utf8(V)};
(user_id, V, Acc) when is_binary(V) ->
Acc#'v1_0.properties'{user_id = {binary, V}};
Expand Down
7 changes: 7 additions & 0 deletions deps/amqp10_common/include/amqp10_types.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,10 @@
-type transfer_number() :: sequence_no().
% [2.8.10]
-type sequence_no() :: uint().

% [2.8.1]
-define(AMQP_ROLE_SENDER, false).
-define(AMQP_ROLE_RECEIVER, true).

% [3.2.16]
-define(MESSAGE_FORMAT, 0).
3 changes: 3 additions & 0 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_access_control.erl",
"src/rabbit_alarm.erl",
"src/rabbit_amqp1_0.erl",
"src/rabbit_amqp_management.erl",
"src/rabbit_amqp_reader.erl",
"src/rabbit_amqp_session.erl",
"src/rabbit_amqp_session_sup.erl",
Expand Down Expand Up @@ -316,6 +317,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_access_control.erl",
"src/rabbit_alarm.erl",
"src/rabbit_amqp1_0.erl",
"src/rabbit_amqp_management.erl",
"src/rabbit_amqp_reader.erl",
"src/rabbit_amqp_session.erl",
"src/rabbit_amqp_session_sup.erl",
Expand Down Expand Up @@ -600,6 +602,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_access_control.erl",
"src/rabbit_alarm.erl",
"src/rabbit_amqp1_0.erl",
"src/rabbit_amqp_management.erl",
"src/rabbit_amqp_reader.erl",
"src/rabbit_amqp_session.erl",
"src/rabbit_amqp_session_sup.erl",
Expand Down
3 changes: 0 additions & 3 deletions deps/rabbit/include/rabbit_amqp.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
%% [2.8.19]
-define(MIN_MAX_FRAME_1_0_SIZE, 512).

-define(SEND_ROLE, false).
-define(RECV_ROLE, true).

%% for rabbit_event user_authentication_success and user_authentication_failure
-define(AUTH_EVENT_KEYS,
[name,
Expand Down
Loading

0 comments on commit a90d26e

Please sign in to comment.