Skip to content

Commit

Permalink
AMQP address v2
Browse files Browse the repository at this point in the history
  • Loading branch information
ansd committed Apr 2, 2024
1 parent e4da70d commit 7846653
Show file tree
Hide file tree
Showing 25 changed files with 1,475 additions and 412 deletions.
6 changes: 5 additions & 1 deletion deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,11 @@ make_target(#{role := {receiver, _Source, _Pid}}) ->
#'v1_0.target'{};
make_target(#{role := {sender, #{address := Address} = Target}}) ->
Durable = translate_terminus_durability(maps:get(durable, Target, none)),
#'v1_0.target'{address = {utf8, Address},
TargetAddr = case is_binary(Address) of
true -> {utf8, Address};
false -> Address
end,
#'v1_0.target'{address = TargetAddr,
durable = {uint, Durable}}.

max_message_size(#{max_message_size := Size})
Expand Down
3 changes: 2 additions & 1 deletion deps/amqp10_client/test/system_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ init_per_suite(Config) ->
]).

end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_helpers:run_teardown_steps(
Config,
[
fun stop_amqp10_client_app/1
]).
Expand Down
9 changes: 9 additions & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,7 @@ rabbitmq_integration_suite(

rabbitmq_integration_suite(
name = "amqp_auth_SUITE",
shard_count = 2,
additional_beam = [
":test_event_recorder_beam",
],
Expand All @@ -1280,6 +1281,14 @@ rabbitmq_integration_suite(
],
)

rabbitmq_integration_suite(
name = "amqp_address_SUITE",
shard_count = 2,
runtime_deps = [
"//deps/rabbitmq_amqp_client:erlang_app",
],
)

rabbitmq_integration_suite(
name = "amqp_credit_api_v2_SUITE",
runtime_deps = [
Expand Down
9 changes: 9 additions & 0 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2220,3 +2220,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/rabbit_common:erlang_app"],
)
erlang_bytecode(
name = "amqp_address_SUITE_beam_files",
testonly = True,
srcs = ["test/amqp_address_SUITE.erl"],
outs = ["test/amqp_address_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbitmq_amqp_client:erlang_app"],
)
23 changes: 10 additions & 13 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -146,20 +146,17 @@ init(Proto, Data, Anns) ->
init(Proto, Data, Anns, #{}).

-spec init(protocol(), term(), annotations(), environment()) -> state().
init(Proto, Data, Anns0, Env)
when is_atom(Proto)
andalso is_map(Anns0)
andalso is_map(Env) ->
init(Proto, Data, Anns0, Env) ->
{ProtoData, ProtoAnns} = Proto:init(Data),
Anns = case maps:size(Env) == 0 of
true ->
Anns0;
false ->
Anns0#{env => Env}
end,
Anns1 = case map_size(Env) == 0 of
true -> Anns0;
false -> Anns0#{env => Env}
end,
Anns2 = maps:merge(ProtoAnns, Anns1),
Anns = set_received_at_timestamp(Anns2),
#?MODULE{protocol = Proto,
data = ProtoData,
annotations = set_received_at_timestamp(maps:merge(ProtoAnns, Anns))}.
annotations = Anns}.

-spec size(state()) ->
{MetadataSize :: non_neg_integer(),
Expand Down Expand Up @@ -196,7 +193,7 @@ take_annotation(_Key, BasicMessage) ->
-spec set_annotation(ann_key(), ann_value(), state()) ->
state().
set_annotation(Key, Value, #?MODULE{annotations = Anns} = State) ->
State#?MODULE{annotations = maps:put(Key, Value, Anns)};
State#?MODULE{annotations = Anns#{Key => Value}};
set_annotation(Key, Value, BasicMessage) ->
mc_compat:set_annotation(Key, Value, BasicMessage).

Expand Down Expand Up @@ -313,7 +310,7 @@ property(_Property, _BasicMsg) ->

-spec set_ttl(undefined | non_neg_integer(), state()) -> state().
set_ttl(Value, #?MODULE{annotations = Anns} = State) ->
State#?MODULE{annotations = maps:put(ttl, Value, Anns)};
State#?MODULE{annotations = Anns#{ttl => Value}};
set_ttl(Value, BasicMsg) ->
mc_compat:set_ttl(Value, BasicMsg).

Expand Down
2 changes: 2 additions & 0 deletions deps/rabbit/src/mc_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ property(user_id, #msg{properties = #'v1_0.properties'{user_id = UserId}}) ->
UserId;
property(subject, #msg{properties = #'v1_0.properties'{subject = Subject}}) ->
Subject;
property(to, #msg{properties = #'v1_0.properties'{to = To}}) ->
To;
property(_Prop, #msg{}) ->
undefined.

Expand Down
3 changes: 1 addition & 2 deletions deps/rabbit/src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1709,8 +1709,7 @@ persist_static_configuration() ->
_ ->
?MAX_MSG_SIZE
end,
ok = persistent_term:put(max_message_size, MaxMsgSize),
ok = rabbit_amqp_management:persist_static_configuration().
ok = persistent_term:put(max_message_size, MaxMsgSize).

persist_static_configuration(Params) ->
App = ?MODULE,
Expand Down
Loading

0 comments on commit 7846653

Please sign in to comment.