Skip to content

Commit

Permalink
Return queue-prefix and topic-prefix in open properties
Browse files Browse the repository at this point in the history
`queue-prefix` and `topic-prefix` properties of the `open` frame are not part of the AMQP spec.
However, let's include these properties because they are also returned by
ActiveMQ and Solace and understood by some client libs (e.g. ActiveMQ NMS.AMQP Client).

Closes #12531
  • Loading branch information
ansd committed Oct 18, 2024
1 parent 73f118f commit 44d4714
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 3 deletions.
10 changes: 9 additions & 1 deletion deps/rabbit/src/rabbit_amqp_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,15 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) ->
server_properties() ->
Props0 = rabbit_reader:server_properties(amqp_1_0),
Props1 = [{{symbol, K}, {utf8, V}} || {K, longstr, V} <- Props0],
Props = [{{symbol, <<"node">>}, {utf8, atom_to_binary(node())}} | Props1],
Props = [
{{symbol, <<"node">>}, {utf8, atom_to_binary(node())}},
%% queue-prefix and topic-prefix are not part of the AMQP spec.
%% However, we include these properties because they are also returned by
%% ActiveMQ and Solace and understood by some client libs (e.g. ActiveMQ NMS.AMQP Client).
%% https://github.com/rabbitmq/rabbitmq-server/issues/12531
{{symbol, <<"queue-prefix">>}, {utf8, <<"/queues/">>}},
{{symbol, <<"topic-prefix">>}, {utf8, <<"/exchanges/amq.topic/">>}}
] ++ Props1,
{map, Props}.

%%--------------------------------------------------------------------------
Expand Down
49 changes: 47 additions & 2 deletions deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ groups() ->
tcp_back_pressure_rabbitmq_internal_flow_quorum_queue,
session_max_per_connection,
link_max_per_session,
reserved_annotation
reserved_annotation,
open_properties_target_address_prefix
]},

{cluster_size_3, [shuffle],
Expand Down Expand Up @@ -4762,7 +4763,7 @@ dead_letter_headers_exchange(Config) ->
#{arguments => #{<<"x-dead-letter-exchange">> => {utf8, <<"amq.headers">>},
<<"x-message-ttl">> => {ulong, 0}}}),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}),
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.headers">>, <<>>,
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.headers">>, <<>>,
#{<<"my key">> => {uint, 5},
<<"x-my key">> => {uint, 6},
<<"x-match">> => {utf8, <<"all-with-x">>}}),
Expand Down Expand Up @@ -5944,6 +5945,50 @@ reserved_annotation(Config) ->
end,
ok = close_connection_sync(Connection).

%% Test case for
%% https://github.com/rabbitmq/rabbitmq-server/issues/12531
%% We pretend here to be unaware of RabbitMQ's target address format.
%% We learn the address format from the properties field in the open frame.
open_properties_target_address_prefix(Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
Topic = <<"topic1">>,

OpnConf0 = connection_config(Config),
OpnConf = OpnConf0#{notify_with_performative => true},
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{QueuePrefix, TopicPrefix} =
receive {amqp10_event, {connection, Connection, {opened, #'v1_0.open'{properties = {map, KVList}}}}} ->
{_, {utf8, QPref}} = proplists:lookup({symbol, <<"queue-prefix">>}, KVList),
{_, {utf8, TPref}} = proplists:lookup({symbol, <<"topic-prefix">>}, KVList),
{QPref, TPref}
after 5000 -> ct:fail({missing_event, ?LINE})
end,

{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>),

{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}),
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, <<"amq.topic">>, Topic, #{}),

{ok, QSender} = amqp10_client:attach_sender_link(
Session, <<"queue sender">>, <<QueuePrefix/binary, QName/binary>>),
{ok, TSender} = amqp10_client:attach_sender_link(
Session, <<"topic sender">>, <<TopicPrefix/binary, Topic/binary>>),
wait_for_credit(QSender),
wait_for_credit(TSender),
ok = amqp10_client:send_msg(QSender, amqp10_msg:new(<<"t1">>, <<"m1">>)),
ok = amqp10_client:send_msg(TSender, amqp10_msg:new(<<"t2">>, <<"m2">>)),
ok = wait_for_accepted(<<"t1">>),
ok = wait_for_accepted(<<"t2">>),

ok = amqp10_client:detach_link(QSender),
ok = amqp10_client:detach_link(TSender),
?assertMatch({ok, #{message_count := 2}},
rabbitmq_amqp_client:delete_queue(LinkPair, QName)),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).

%% internal
%%

Expand Down

0 comments on commit 44d4714

Please sign in to comment.