From de804d1fa75418e16fd4232a48fb3b2711fd4964 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 12 Nov 2024 12:16:15 +0100 Subject: [PATCH] Support publishing AMQP 1.0 to Event Exchange ## What? Prior to this commit, the `rabbitmq_event_exchange` internally published always AMQP 0.9.1 messages to the `amq.rabbitmq.event` topic exchange. This commit allows users to configure the plugin to publish AMQP 1.0 messages instead. ## Why? Prior to this commit, when an AMQP 1.0 client consumed events, event properties that are lists were omitted. For example property `client_properties` of event `connection.created` or property `arguments` of event `queue.created` were omitted because of the following sequence: 1. The event exchange plugins listens for all kind of internal events. 2. The event exchange plugin re-publishes all events as AMQP 0.9.1 message to the event exchange. 3. Later, when an AMQP 1.0 client consumes this message, the broker must translate the message from AMQP 0.9.1 to AMQP 1.0. 4. This translation follows the rules outlined in https://www.rabbitmq.com/docs/conversions#amqpl-amqp 5. Specifically, in this table the row before the last one describes the rule we're hitting here. It says that if the AMQP 0.9.1 header value is not an `x-` prefixed header and its value is an array or table, then this header is not converted. That's because AMQP 1.0 application-properties must be simple types as mandated in https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-application-properties ## How? The user can configure the plugin as follows to have the plugin internally publish AMQP 1.0 messages: ``` event_exchange.protocol = amqp_1_0 ``` To support complex types such as lists, the plugin sets all event properties as AMQP 1.0 message-annotations. The plugin prefixes all message annotation keys with `x-opt-` to comply with the AMQP 1.0 spec. ## Alternative Design An alternative design would have been to format all event properties e.g. as JSON within the message body. However, this breaks routing on specific event property values via a headers exchange. ## Documentation https://github.com/rabbitmq/rabbitmq-website/pull/2129 --- deps/rabbit/src/mc_amqpl.erl | 11 +- deps/rabbit/src/mc_util.erl | 13 + deps/rabbit/test/mc_unit_SUITE.erl | 62 +-- deps/rabbitmq_event_exchange/Makefile | 6 + deps/rabbitmq_event_exchange/README.md | 151 +----- .../schema/rabbitmq_event_exchange.schema | 4 + .../src/rabbit_exchange_type_event.erl | 194 ++++++-- .../rabbitmq_event_exchange.snippets | 47 +- .../test/system_SUITE.erl | 437 +++++++++++++----- release-notes/4.1.0.md | 5 + 10 files changed, 554 insertions(+), 376 deletions(-) diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index 936a1b130d89..cac190e2cb5e 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -43,7 +43,6 @@ -define(AMQP10_FOOTER, <<"x-amqp-1.0-footer">>). -define(PROTOMOD, rabbit_framing_amqp_0_9_1). -define(CLASS_ID, 60). --define(LONGSTR_UTF8_LIMIT, 4096). -opaque state() :: #content{}. @@ -682,19 +681,13 @@ wrap(_Type, undefined) -> wrap(Type, Val) -> {Type, Val}. -from_091(longstr, V) - when is_binary(V) andalso - byte_size(V) =< ?LONGSTR_UTF8_LIMIT -> - %% if a longstr is longer than 4096 bytes we just assume it is binary - %% it _may_ still be valid utf8 but checking this for every longstr header - %% value is going to be excessively slow - case mc_util:is_utf8_no_null(V) of +from_091(longstr, V) -> + case mc_util:is_utf8_no_null_limited(V) of true -> {utf8, V}; false -> {binary, V} end; -from_091(longstr, V) -> {binary, V}; from_091(long, V) -> {long, V}; from_091(unsignedbyte, V) -> {ubyte, V}; from_091(short, V) -> {short, V}; diff --git a/deps/rabbit/src/mc_util.erl b/deps/rabbit/src/mc_util.erl index 9ec7928de9b7..d19f17e7d92b 100644 --- a/deps/rabbit/src/mc_util.erl +++ b/deps/rabbit/src/mc_util.erl @@ -3,6 +3,7 @@ -include("mc.hrl"). -export([is_valid_shortstr/1, + is_utf8_no_null_limited/1, is_utf8_no_null/1, uuid_to_urn_string/1, urn_string_to_uuid/1, @@ -12,12 +13,24 @@ is_x_header/1 ]). +-define(UTF8_SCAN_LIMIT, 4096). + -spec is_valid_shortstr(term()) -> boolean(). is_valid_shortstr(Bin) when ?IS_SHORTSTR_LEN(Bin) -> is_utf8_no_null(Bin); is_valid_shortstr(_) -> false. +-spec is_utf8_no_null_limited(term()) -> boolean(). +is_utf8_no_null_limited(Bin) + when byte_size(Bin) =< ?UTF8_SCAN_LIMIT -> + is_utf8_no_null(Bin); +is_utf8_no_null_limited(_Term) -> + %% If longer than 4096 bytes, just assume it's not UTF-8. + %% It _may_ still be valid UTF-8 but checking this + %% on the hot path is going to be excessively slow. + false. + -spec is_utf8_no_null(term()) -> boolean(). is_utf8_no_null(Term) -> utf8_scan(Term, fun (C) -> C > 0 end). diff --git a/deps/rabbit/test/mc_unit_SUITE.erl b/deps/rabbit/test/mc_unit_SUITE.erl index f8d10462e629..1949763c5c76 100644 --- a/deps/rabbit/test/mc_unit_SUITE.erl +++ b/deps/rabbit/test/mc_unit_SUITE.erl @@ -313,34 +313,37 @@ amqpl_amqp_bin_amqpl(_Config) -> %% incoming amqpl converted to amqp, serialized / deserialized then converted %% back to amqpl. %% simulates a legacy message published then consumed to a stream - Props = #'P_basic'{content_type = <<"text/plain">>, - content_encoding = <<"gzip">>, - headers = [{<<"a-stream-offset">>, long, 99}, - {<<"a-string">>, longstr, <<"a string">>}, - {<<"a-bool">>, bool, false}, - {<<"a-unsignedbyte">>, unsignedbyte, 1}, - {<<"a-unsignedshort">>, unsignedshort, 1}, - {<<"a-unsignedint">>, unsignedint, 1}, - {<<"a-signedint">>, signedint, 1}, - {<<"a-timestamp">>, timestamp, 1}, - {<<"a-double">>, double, 1.0}, - {<<"a-float">>, float, 1.0}, - {<<"a-void">>, void, undefined}, - {<<"a-binary">>, binary, <<"data">>}, - {<<"a-array">>, array, [{long, 1}, {long, 2}]}, - {<<"x-stream-filter">>, longstr, <<"apple">>} - ], - delivery_mode = 2, - priority = 98, - correlation_id = <<"corr">> , - reply_to = <<"reply-to">>, - expiration = <<"1">>, - message_id = <<"msg-id">>, - timestamp = 99, - type = <<"45">>, - user_id = <<"banana">>, - app_id = <<"rmq">> - }, + String5k = binary:copy(<<"x">>, 5000), + Props = #'P_basic'{ + content_type = <<"text/plain">>, + content_encoding = <<"gzip">>, + headers = [{<<"a-stream-offset">>, long, 99}, + {<<"a-string">>, longstr, <<"a string">>}, + {<<"a-very-long-string">>, longstr, String5k}, + {<<"a-bool">>, bool, false}, + {<<"a-unsignedbyte">>, unsignedbyte, 1}, + {<<"a-unsignedshort">>, unsignedshort, 1}, + {<<"a-unsignedint">>, unsignedint, 1}, + {<<"a-signedint">>, signedint, 1}, + {<<"a-timestamp">>, timestamp, 1}, + {<<"a-double">>, double, 1.0}, + {<<"a-float">>, float, 1.0}, + {<<"a-void">>, void, undefined}, + {<<"a-binary">>, binary, <<"data">>}, + {<<"a-array">>, array, [{long, 1}, {long, 2}]}, + {<<"x-stream-filter">>, longstr, <<"apple">>} + ], + delivery_mode = 2, + priority = 98, + correlation_id = <<"corr">> , + reply_to = <<"reply-to">>, + expiration = <<"1">>, + message_id = <<"msg-id">>, + timestamp = 99, + type = <<"45">>, + user_id = <<"banana">>, + app_id = <<"rmq">> + }, Content = #content{properties = Props, payload_fragments_rev = [<<"data">>]}, Msg = mc:init(mc_amqpl, Content, annotations()), @@ -404,6 +407,9 @@ amqpl_amqp_bin_amqpl(_Config) -> ?assertEqual({long, 99}, Get(<<"a-stream-offset">>, AP10)), ?assertEqual({utf8, <<"a string">>}, Get(<<"a-string">>, AP10)), + %% We expect that a very long string is not scanned for valid UTF-8 + %% and instead directly turned into a binary. + ?assertEqual({binary, String5k}, Get(<<"a-very-long-string">>, AP10)), ?assertEqual(false, Get(<<"a-bool">>, AP10)), ?assertEqual({ubyte, 1}, Get(<<"a-unsignedbyte">>, AP10)), ?assertEqual({ushort, 1}, Get(<<"a-unsignedshort">>, AP10)), diff --git a/deps/rabbitmq_event_exchange/Makefile b/deps/rabbitmq_event_exchange/Makefile index fdac1be67e6e..72d6367dd744 100644 --- a/deps/rabbitmq_event_exchange/Makefile +++ b/deps/rabbitmq_event_exchange/Makefile @@ -1,6 +1,12 @@ PROJECT = rabbitmq_event_exchange PROJECT_DESCRIPTION = Event Exchange Type +define PROJECT_ENV + [ + {protocol, amqp_0_9_1} + ] +endef + define PROJECT_APP_EXTRA_KEYS {broker_version_requirements, []} endef diff --git a/deps/rabbitmq_event_exchange/README.md b/deps/rabbitmq_event_exchange/README.md index 1380a4d30f72..4f2aab35e699 100644 --- a/deps/rabbitmq_event_exchange/README.md +++ b/deps/rabbitmq_event_exchange/README.md @@ -1,154 +1,7 @@ # RabbitMQ Event Exchange -## Overview - -This plugin exposes the internal RabbitMQ event mechanism as messages that clients -can consume. It's useful -if you want to keep track of certain events, e.g. when queues, exchanges, bindings, users, -connections, channels are created and deleted. This plugin filters out stats -events, so you are almost certainly going to get better results using -the management plugin for stats. - -## How it Works - -It declares a topic exchange called `amq.rabbitmq.event` **in the default -virtual host**. All events are published to this exchange with routing -keys like 'exchange.created', 'binding.deleted' etc, so you can -subscribe to only the events you're interested in. - -The exchange behaves similarly to 'amq.rabbitmq.log': everything gets -published there; if you don't trust a user with the information that -gets published, don't allow them access. - - -## Installation - -This plugin ships with RabbitMQ. Like with all other plugins, it must be -enabled before it can be used: - -```bash -[sudo] rabbitmq-plugins enable rabbitmq_event_exchange -``` - -## Event format - -Each event has various properties associated with it. These are -translated into AMQP 0-9-1 data encoding and inserted in the message headers. The -**message body is always blank**. - -## Events - -So far RabbitMQ and related plugins emit events with the following routing keys: - -### RabbitMQ Broker - -Queue, Exchange and Binding events: - - * `queue.deleted` - * `queue.created` - * `exchange.created` - * `exchange.deleted` - * `binding.created` - * `binding.deleted` - -Connection and Channel events: - - * `connection.created` - * `connection.closed` - * `channel.created` - * `channel.closed` - -Consumer events: - - * `consumer.created` - * `consumer.deleted` - -Policy and Parameter events: - - * `policy.set` - * `policy.cleared` - * `parameter.set` - * `parameter.cleared` - -Virtual host events: - - * `vhost.created` - * `vhost.deleted` - * `vhost.limits.set` - * `vhost.limits.cleared` - -User related events: - - * `user.authentication.success` - * `user.authentication.failure` - * `user.created` - * `user.deleted` - * `user.password.changed` - * `user.password.cleared` - * `user.tags.set` - -Permission events: - - * `permission.created` - * `permission.deleted` - * `topic.permission.created` - * `topic.permission.deleted` - -Alarm events: - - * `alarm.set` - * `alarm.cleared` - -### Shovel Plugin - -Worker events: - - * `shovel.worker.status` - * `shovel.worker.removed` - -### Federation Plugin - -Link events: - - * `federation.link.status` - * `federation.link.removed` - -## Example - -There is a usage example using the Java client in `examples/java`. - - -## Configuration - - * `rabbitmq_event_exchange.vhost`: what vhost should the `amq.rabbitmq.event` exchange be declared in. Default: `rabbit.default_vhost` (`<<"/">>`). - - -## Uninstalling - -If you want to remove the exchange which this plugin creates, first -disable the plugin and restart the broker. Then you can delete the exchange, -e.g. with : - - rabbitmqctl eval 'rabbit_exchange:delete(rabbit_misc:r(<<"/">>, exchange, <<"amq.rabbitmq.event">>), false, <<"username">>).' - - -## Building from Source - -Building is no different from [building other RabbitMQ plugins](https://www.rabbitmq.com/plugin-development.html). - -TL;DR: - - git clone https://github.com.com/rabbitmq/rabbitmq-public-umbrella.git umbrella - cd umbrella - make co - make up BRANCH=stable - cd deps - git clone https://github.com/rabbitmq/rabbitmq-event-exchange.git rabbitmq_event_exchange - cd rabbitmq_event_exchange - make dist - +See the [website](https://www.rabbitmq.com/docs/event-exchange) for documentation. ## License -Released under the Mozilla Public License 2.0, -the same as RabbitMQ. +Released under the Mozilla Public License 2.0, the same as RabbitMQ. diff --git a/deps/rabbitmq_event_exchange/priv/schema/rabbitmq_event_exchange.schema b/deps/rabbitmq_event_exchange/priv/schema/rabbitmq_event_exchange.schema index c8b2efe5acdd..62de27e820c7 100644 --- a/deps/rabbitmq_event_exchange/priv/schema/rabbitmq_event_exchange.schema +++ b/deps/rabbitmq_event_exchange/priv/schema/rabbitmq_event_exchange.schema @@ -5,3 +5,7 @@ fun(Conf) -> list_to_binary(cuttlefish:conf_get("event_exchange.vhost", Conf)) end}. + +{mapping, "event_exchange.protocol", "rabbitmq_event_exchange.protocol", [ + {datatype, {enum, [amqp_0_9_1, amqp_1_0]}} +]}. diff --git a/deps/rabbitmq_event_exchange/src/rabbit_exchange_type_event.erl b/deps/rabbitmq_event_exchange/src/rabbit_exchange_type_event.erl index 70251406b20c..b79508b8b8d0 100644 --- a/deps/rabbitmq_event_exchange/src/rabbit_exchange_type_event.erl +++ b/deps/rabbitmq_event_exchange/src/rabbit_exchange_type_event.erl @@ -11,6 +11,8 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit_common/include/rabbit_framing.hrl"). +-include_lib("amqp10_common/include/amqp10_framing.hrl"). +-include_lib("rabbit/include/mc.hrl"). -include("rabbit_event_exchange.hrl"). -export([register/0, unregister/0]). @@ -20,8 +22,11 @@ -export([fmt_proplist/1]). %% testing --record(state, {vhost, - has_any_bindings +-define(APP_NAME, rabbitmq_event_exchange). + +-record(state, {protocol :: amqp_0_9_1 | amqp_1_0, + vhost :: rabbit_types:vhost(), + has_any_bindings :: boolean() }). -rabbit_boot_step({rabbit_event_exchange, @@ -65,41 +70,35 @@ exchange(VHost) -> %%---------------------------------------------------------------------------- init([]) -> + {ok, Protocol} = application:get_env(?APP_NAME, protocol), VHost = get_vhost(), X = rabbit_misc:r(VHost, exchange, ?EXCH_NAME), HasBindings = case rabbit_binding:list_for_source(X) of - [] -> false; - _ -> true - end, - {ok, #state{vhost = VHost, + [] -> false; + _ -> true + end, + {ok, #state{protocol = Protocol, + vhost = VHost, has_any_bindings = HasBindings}}. handle_call(_Request, State) -> {ok, not_understood, State}. -handle_event(_, #state{has_any_bindings = false} = State) -> - {ok, State}; -handle_event(#event{type = Type, - props = Props, - timestamp = TS, - reference = none}, #state{vhost = VHost} = State) -> - _ = case key(Type) of - ignore -> ok; - Key -> - Props2 = [{<<"timestamp_in_ms">>, TS} | Props], - PBasic = #'P_basic'{delivery_mode = 2, - headers = fmt_proplist(Props2), - %% 0-9-1 says the timestamp is a - %% "64 bit POSIX - %% timestamp". That's second - %% resolution, not millisecond. - timestamp = erlang:convert_time_unit( - TS, milli_seconds, seconds)}, - Content = rabbit_basic:build_content(PBasic, <<>>), - XName = exchange(VHost), - {ok, Msg} = mc_amqpl:message(XName, Key, Content), - rabbit_queue_type:publish_at_most_once(XName, Msg) - end, - {ok, State}; +handle_event(#event{type = Type, + props = Props, + reference = none, + timestamp = Timestamp}, + #state{protocol = Protocol, + vhost = VHost, + has_any_bindings = true} = State) -> + case key(Type) of + ignore -> + {ok, State}; + Key -> + XName = exchange(VHost), + Mc = mc_init(Protocol, XName, Key, Props, Timestamp), + _ = rabbit_queue_type:publish_at_most_once(XName, Mc), + {ok, State} + end; handle_event(_Event, State) -> {ok, State}. @@ -207,9 +206,109 @@ key(S) -> Tokens -> list_to_binary(string:join(Tokens, ".")) end. +get_vhost() -> + case application:get_env(?APP_NAME, vhost) of + undefined -> + {ok, V} = application:get_env(rabbit, default_vhost), + V; + {ok, V} -> + V + end. + +mc_init(amqp_1_0, #resource{name = XNameBin}, Key, Props, Timestamp) -> + Sections = [#'v1_0.message_annotations'{content = props_to_message_annotations(Props)}, + #'v1_0.properties'{creation_time = {timestamp, Timestamp}}, + #'v1_0.data'{content = <<>>}], + Payload = iolist_to_binary([amqp10_framing:encode_bin(S) || S <- Sections]), + Anns = #{?ANN_EXCHANGE => XNameBin, + ?ANN_ROUTING_KEYS => [Key]}, + mc:init(mc_amqp, Payload, Anns); +mc_init(amqp_0_9_1, XName, Key, Props0, TimestampMillis) -> + Props = [{<<"timestamp_in_ms">>, TimestampMillis} | Props0], + Headers = fmt_proplist(Props), + TimestampSecs = erlang:convert_time_unit(TimestampMillis, millisecond, second), + PBasic = #'P_basic'{delivery_mode = 2, + headers = Headers, + timestamp = TimestampSecs}, + Content = rabbit_basic:build_content(PBasic, <<>>), + {ok, Mc} = mc_amqpl:message(XName, Key, Content), + Mc. + +props_to_message_annotations(Props) -> + KVList = lists:foldl( + fun({K, #resource{virtual_host = Vhost, name = Name}}, Acc) -> + Ann0 = {to_message_annotation_key(K), {utf8, Name}}, + Ann1 = {{symbol, <<"x-opt-vhost">>}, {utf8, Vhost}}, + [Ann0, Ann1 | Acc]; + ({K, V}, Acc) -> + Ann = {to_message_annotation_key(K), + to_message_annotation_val(V)}, + [Ann | Acc] + end, [], Props), + lists:reverse(KVList). + +to_message_annotation_key(Key) -> + Key1 = to_binary(Key), + Pattern = try persistent_term:get(cp_underscore) + catch error:badarg -> + Cp = binary:compile_pattern(<<"_">>), + ok = persistent_term:put(cp_underscore, Cp), + Cp + end, + Key2 = binary:replace(Key1, Pattern, <<"-">>, [global]), + Key3 = case Key2 of + <<"x-", _/binary>> -> + Key2; + _ -> + <<"x-opt-", Key2/binary>> + end, + {symbol, Key3}. + +to_message_annotation_val(V) + when is_boolean(V) -> + {boolean, V}; +to_message_annotation_val(V) + when is_atom(V) -> + {utf8, atom_to_binary(V, utf8)}; +to_message_annotation_val(V) + when is_binary(V) -> + case mc_util:is_utf8_no_null_limited(V) of + true -> + {utf8, V}; + false -> + {binary, V} + end; +to_message_annotation_val(V) + when is_integer(V) -> + {long, V}; +to_message_annotation_val(V) + when is_number(V) -> + %% AMQP double and Erlang float are both 64-bit. + {double, V}; +to_message_annotation_val(V) + when is_pid(V) -> + {utf8, to_pid(V)}; +to_message_annotation_val([{Key, _} | _] = Proplist) + when is_atom(Key) orelse + is_binary(Key) -> + {map, lists:map(fun({K, V}) -> + {{utf8, to_binary(K)}, + to_message_annotation_val(V)} + end, Proplist)}; +to_message_annotation_val([{Key, Type, _Value} | _] = Table) + when is_binary(Key) andalso + is_atom(Type) -> + %% Looks like an AMQP 0.9.1 table + mc_amqpl:from_091(table, Table); +to_message_annotation_val(V) + when is_list(V) -> + {list, [to_message_annotation_val(Val) || Val <- V]}; +to_message_annotation_val(V) -> + {utf8, fmt_other(V)}. + fmt_proplist(Props) -> lists:foldl(fun({K, V}, Acc) -> - case fmt(a2b(K), V) of + case fmt(to_binary(K), V) of L when is_list(L) -> lists:append(L, Acc); T -> [T | Acc] end @@ -226,11 +325,8 @@ fmt(K, V) when is_number(V) -> {K, float, V}; fmt(K, V) when is_binary(V) -> {K, longstr, V}; fmt(K, [{_, _}|_] = Vs) -> {K, table, fmt_proplist(Vs)}; fmt(K, Vs) when is_list(Vs) -> {K, array, [fmt(V) || V <- Vs]}; -fmt(K, V) when is_pid(V) -> {K, longstr, - list_to_binary(rabbit_misc:pid_to_string(V))}; -fmt(K, V) -> {K, longstr, - list_to_binary( - rabbit_misc:format("~1000000000p", [V]))}. +fmt(K, V) when is_pid(V) -> {K, longstr, to_pid(V)}; +fmt(K, V) -> {K, longstr, fmt_other(V)}. %% Exactly the same as fmt/2, duplicated only for performance issues fmt(true) -> {bool, true}; @@ -241,20 +337,16 @@ fmt(V) when is_number(V) -> {float, V}; fmt(V) when is_binary(V) -> {longstr, V}; fmt([{_, _}|_] = Vs) -> {table, fmt_proplist(Vs)}; fmt(Vs) when is_list(Vs) -> {array, [fmt(V) || V <- Vs]}; -fmt(V) when is_pid(V) -> {longstr, - list_to_binary(rabbit_misc:pid_to_string(V))}; -fmt(V) -> {longstr, - list_to_binary( - rabbit_misc:format("~1000000000p", [V]))}. +fmt(V) when is_pid(V) -> {longstr, to_pid(V)}; +fmt(V) -> {longstr, fmt_other(V)}. -a2b(A) when is_atom(A) -> atom_to_binary(A, utf8); -a2b(B) when is_binary(B) -> B. +fmt_other(V) -> + list_to_binary(rabbit_misc:format("~1000000000p", [V])). -get_vhost() -> - case application:get_env(rabbitmq_event_exchange, vhost) of - undefined -> - {ok, V} = application:get_env(rabbit, default_vhost), - V; - {ok, V} -> - V - end. +to_binary(Val) when is_atom(Val) -> + atom_to_binary(Val); +to_binary(Val) when is_binary(Val) -> + Val. + +to_pid(Val) -> + list_to_binary(rabbit_misc:pid_to_string(Val)). diff --git a/deps/rabbitmq_event_exchange/test/config_schema_SUITE_data/rabbitmq_event_exchange.snippets b/deps/rabbitmq_event_exchange/test/config_schema_SUITE_data/rabbitmq_event_exchange.snippets index 2fceed017a96..70eb722731b9 100644 --- a/deps/rabbitmq_event_exchange/test/config_schema_SUITE_data/rabbitmq_event_exchange.snippets +++ b/deps/rabbitmq_event_exchange/test/config_schema_SUITE_data/rabbitmq_event_exchange.snippets @@ -1,19 +1,34 @@ [ - {virtual_host1, - "event_exchange.vhost = /", - [ - {rabbitmq_event_exchange, [ - {vhost, <<"/">>} - ]} - ], [rabbitmq_event_exchange] - }, +{virtual_host1, + "event_exchange.vhost = /", + [{rabbitmq_event_exchange, [ + {vhost, <<"/">>} + ]}], + [rabbitmq_event_exchange] +}, - {virtual_host2, - "event_exchange.vhost = dev", - [ - {rabbitmq_event_exchange, [ - {vhost, <<"dev">>} - ]} - ], [rabbitmq_event_exchange] - } +{virtual_host2, + "event_exchange.vhost = dev", + [{rabbitmq_event_exchange, [ + {vhost, <<"dev">>} + ]} + ], + [rabbitmq_event_exchange] +}, + +{protocol_amqp, + "event_exchange.protocol = amqp_1_0", + [{rabbitmq_event_exchange, [ + {protocol, amqp_1_0} + ]}], + [rabbitmq_event_exchange] +}, + +{protocol_amqpl, + "event_exchange.protocol = amqp_0_9_1", + [{rabbitmq_event_exchange, [ + {protocol, amqp_0_9_1} + ]}], + [rabbitmq_event_exchange] +} ]. diff --git a/deps/rabbitmq_event_exchange/test/system_SUITE.erl b/deps/rabbitmq_event_exchange/test/system_SUITE.erl index 4610378131ea..07002efab805 100644 --- a/deps/rabbitmq_event_exchange/test/system_SUITE.erl +++ b/deps/rabbitmq_event_exchange/test/system_SUITE.erl @@ -13,74 +13,83 @@ -compile(export_all). --define(TAG, <<"user_who_performed_action">>). - all() -> [ - {group, amqp}, - {group, amqpl} + {group, amqp_1_0}, + {group, amqp_0_9_1} ]. groups() -> [ - {amqp, [shuffle], + {amqp_1_0, [shuffle], + shared_tests() ++ [ - amqp_connection + amqp_1_0_amqp_connection, + amqp_1_0_queue_created, + headers_exchange ]}, - {amqpl, [], + {amqp_0_9_1, [], + shared_tests() ++ [ - queue_created, - authentication, - audit_queue, - audit_exchange, - audit_exchange_internal_parameter, - audit_binding, - audit_vhost, - audit_vhost_deletion, - audit_channel, - audit_connection, - audit_direct_connection, - audit_consumer, - audit_parameter, - audit_policy, - audit_vhost_limit, - audit_user, - audit_user_password, - audit_user_tags, - audit_permission, - audit_topic_permission, - resource_alarm, + amqp_0_9_1_amqp_connection, + amqp_0_9_1_queue_created, unregister ]} ]. +shared_tests() -> + [ + authentication_success, + authentication_failure, + audit_queue, + audit_exchange, + audit_exchange_internal_parameter, + audit_binding, + audit_vhost, + audit_vhost_deletion, + audit_channel, + audit_connection, + audit_direct_connection, + audit_consumer, + audit_parameter, + audit_policy, + audit_vhost_limit, + audit_user, + audit_user_password, + audit_user_tags, + audit_permission, + audit_topic_permission, + resource_alarm + ]. + %% ------------------------------------------------------------------- %% Testsuite setup/teardown. %% ------------------------------------------------------------------- init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(rabbitmq_amqp_client), rabbit_ct_helpers:log_environment(), - Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodename_suffix, ?MODULE} - ]), - Config2 = rabbit_ct_helpers:run_setup_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()), - Config2. + Config. end_per_suite(Config) -> - rabbit_ct_helpers:run_teardown_steps(Config, - rabbit_ct_client_helpers:teardown_steps() ++ - rabbit_ct_broker_helpers:teardown_steps()). - -init_per_group(amqp, Config) -> - {ok, _} = application:ensure_all_started(rabbitmq_amqp_client), - Config; -init_per_group(_, Config) -> Config. -end_per_group(_, Config) -> - Config. +init_per_group(Group, Config) -> + Config1 = rabbit_ct_helpers:merge_app_env( + Config, + {rabbitmq_event_exchange, [{protocol, Group}]}), + Config2 = rabbit_ct_helpers:set_config( + Config1, [{rmq_nodename_suffix, ?MODULE}]), + rabbit_ct_helpers:run_setup_steps( + Config2, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_group(_Group, Config) -> + rabbit_ct_helpers:run_teardown_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase). @@ -88,34 +97,52 @@ init_per_testcase(Testcase, Config) -> end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). - %% ------------------------------------------------------------------- %% Testsuite cases %% ------------------------------------------------------------------- -%% Only really tests that we're not completely broken. -queue_created(Config) -> - Now = os:system_time(seconds), - - Ch = declare_event_queue(Config, <<"queue.*">>), +amqp_1_0_queue_created(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Headers = queue_created(QName, Config), + ?assertEqual({longstr, QName}, + rabbit_misc:table_lookup(Headers, <<"x-opt-name">>)), + ?assertEqual({table, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + rabbit_misc:table_lookup(Headers, <<"x-opt-arguments">>)). - #'queue.declare_ok'{queue = Q2} = - amqp_channel:call(Ch, #'queue.declare'{exclusive = true}), +amqp_0_9_1_queue_created(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Headers = queue_created(QName,Config), + ?assertEqual({longstr, QName}, + rabbit_misc:table_lookup(Headers, <<"name">>)), + {array, QArgs} = rabbit_misc:table_lookup(Headers, <<"arguments">>), + %% Ideally, instead of a longstr containing the formatted Erlang term, + %% we should expect a table. + ?assertEqual(<<"{<<\"x-queue-type\">>,longstr,<<\"classic\">>}">>, + proplists:get_value(longstr, QArgs)). + +queue_created(QName, Config) -> + Ch = declare_event_queue(Config, <<"queue.created">>), + + Now = os:system_time(second), + #'queue.declare_ok'{} = amqp_channel:call( + Ch, #'queue.declare'{ + queue = QName, + exclusive = true, + arguments = [{<<"x-queue-type">>, longstr, <<"classic">>}] + }), receive {#'basic.deliver'{routing_key = Key}, - #amqp_msg{props = #'P_basic'{headers = Headers, timestamp = TS}}} -> + #amqp_msg{props = #'P_basic'{headers = Headers, + timestamp = TS}}} -> %% timestamp is within the last 5 seconds - true = ((TS - Now) =< 5), - <<"queue.created">> = Key, - {longstr, Q2} = rabbit_misc:table_lookup(Headers, <<"name">>) - end, - - rabbit_ct_client_helpers:close_channel(Ch), - ok. - + ?assert(((TS - Now) =< 5)), + ?assertEqual(<<"queue.created">>, Key), + rabbit_ct_client_helpers:close_channel(Ch), + Headers + end. -authentication(Config) -> +authentication_success(Config) -> Ch = declare_event_queue(Config, <<"user.#">>), Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0), @@ -123,14 +150,41 @@ authentication(Config) -> {#'basic.deliver'{routing_key = Key}, #amqp_msg{props = #'P_basic'{headers = Headers}}} -> <<"user.authentication.success">> = Key, - undefined = rabbit_misc:table_lookup(Headers, <<"vhost">>), - {longstr, _PeerHost} = rabbit_misc:table_lookup(Headers, <<"peer_host">>), - {bool, false} = rabbit_misc:table_lookup(Headers, <<"ssl">>) + {Vhost, PeerHost, Ssl} = + case group_name(Config) of + amqp_0_9_1 -> + {<<"vhost">>, <<"peer_host">>, <<"ssl">>}; + amqp_1_0 -> + {<<"x-opt-vhost">>, <<"x-opt-peer-host">>, <<"x-opt-ssl">>} + end, + undefined = rabbit_misc:table_lookup(Headers, Vhost), + {longstr, _PeerHost} = rabbit_misc:table_lookup(Headers, PeerHost), + {bool, false} = rabbit_misc:table_lookup(Headers, Ssl) + after 5000 -> missing_deliver end, - amqp_connection:close(Conn2), - rabbit_ct_client_helpers:close_channel(Ch), - ok. + ok = amqp_connection:close(Conn2), + ok = rabbit_ct_client_helpers:close_channel(Ch). + +authentication_failure(Config) -> + Ch = declare_event_queue(Config, <<"user.authentication.*">>), + {error, _} = rabbit_ct_client_helpers:open_unmanaged_connection( + Config, 0, <<"fake user">>, <<"fake password">>), + + receive + {#'basic.deliver'{routing_key = Key}, + #amqp_msg{props = #'P_basic'{headers = Headers}}} -> + ?assertEqual(<<"user.authentication.failure">>, Key), + User = case group_name(Config) of + amqp_0_9_1 -> <<"name">>; + amqp_1_0 -> <<"x-opt-name">> + end, + ?assertEqual({longstr, <<"fake user">>}, + rabbit_misc:table_lookup(Headers, User)) + after 5000 -> missing_deliver + end, + + ok = rabbit_ct_client_helpers:close_channel(Ch). audit_queue(Config) -> Ch = declare_event_queue(Config, <<"queue.*">>), @@ -138,13 +192,12 @@ audit_queue(Config) -> #'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, #'queue.declare'{exclusive = true}), - User = proplists:get_value(rmq_username, Config), - receive_user_in_event(<<"queue.created">>, User), + receive_user_in_event(<<"queue.created">>, Config), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = Q}), - receive_user_in_event(<<"queue.deleted">>, User), + receive_user_in_event(<<"queue.deleted">>, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -157,13 +210,12 @@ audit_exchange(Config) -> amqp_channel:call(Ch, #'exchange.declare'{exchange = X, type = <<"topic">>}), - User = proplists:get_value(rmq_username, Config), - receive_user_in_event(<<"exchange.created">>, User), + receive_user_in_event(<<"exchange.created">>, Config), #'exchange.delete_ok'{} = amqp_channel:call(Ch, #'exchange.delete'{exchange = X}), - receive_user_in_event(<<"exchange.deleted">>, User), + receive_user_in_event(<<"exchange.deleted">>, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -171,8 +223,7 @@ audit_exchange(Config) -> audit_binding(Config) -> Ch = declare_event_queue(Config, <<"binding.*">>), %% The binding to the event exchange itself is the first queued event - User = proplists:get_value(rmq_username, Config), - receive_user_in_event(<<"binding.created">>, User), + receive_user_in_event(<<"binding.created">>, Config), #'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, #'queue.declare'{exclusive = true}), @@ -181,26 +232,34 @@ audit_binding(Config) -> amqp_channel:call(Ch, #'queue.bind'{queue = Q, exchange = <<"amq.direct">>, routing_key = <<"test">>}), - receive_user_in_event(<<"binding.created">>, User), + receive_user_in_event(<<"binding.created">>, Config), #'queue.unbind_ok'{} = amqp_channel:call(Ch, #'queue.unbind'{queue = Q, exchange = <<"amq.direct">>, routing_key = <<"test">>}), - receive_user_in_event(<<"binding.deleted">>, User), + receive_user_in_event(<<"binding.deleted">>, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. audit_vhost(Config) -> + Node = atom_to_binary(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)), Ch = declare_event_queue(Config, <<"vhost.*">>), User = <<"Bugs Bunny">>, rabbit_ct_broker_helpers:add_vhost(Config, 0, <<"test-vhost">>, User), - receive_user_in_event(<<"vhost.created">>, User), + Headers = receive_user_in_event(<<"vhost.created">>, User, Config), + + Key = case group_name(Config) of + amqp_0_9_1 -> <<"cluster_state">>; + amqp_1_0 -> <<"x-opt-cluster-state">> + end, + ?assertEqual({table, [{Node, longstr, <<"running">>}]}, + rabbit_misc:table_lookup(Headers, Key)), rabbit_ct_broker_helpers:delete_vhost(Config, 0, <<"test-vhost">>, User), - receive_user_in_event(<<"vhost.deleted">>, User), + receive_user_in_event(<<"vhost.deleted">>, User, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -218,72 +277,81 @@ audit_vhost_deletion(Config) -> %% The user that creates the queue is the connection one, not the vhost creator #'queue.declare_ok'{queue = _Q} = amqp_channel:call(Ch2, #'queue.declare'{}), - receive_user_in_event(<<"queue.created">>, ConnUser), + receive_user_in_event(<<"queue.created">>, ConnUser, Config), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch2), %% Validate that the user deleting the queue is the one used to delete the vhost, %% not the original user that created the queue (the connection one) rabbit_ct_broker_helpers:delete_vhost(Config, 0, Vhost, User), - receive_user_in_event(<<"queue.deleted">>, User), + receive_user_in_event(<<"queue.deleted">>, User, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. audit_channel(Config) -> Ch = declare_event_queue(Config, <<"channel.*">>), - User = proplists:get_value(rmq_username, Config), Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), {ok, Ch2} = amqp_connection:open_channel(Conn), - receive_user_in_event(<<"channel.created">>, User), + receive_user_in_event(<<"channel.created">>, Config), rabbit_ct_client_helpers:close_channel(Ch2), - receive_user_in_event(<<"channel.closed">>, User), + receive_user_in_event(<<"channel.closed">>, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. audit_connection(Config) -> Ch = declare_event_queue(Config, <<"connection.*">>), - User = proplists:get_value(rmq_username, Config), Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), - receive_user_in_event(<<"connection.created">>, User), + receive_user_in_event(<<"connection.created">>, Config), %% Username is not available in connection_close rabbit_ct_client_helpers:close_connection(Conn), - receive_event(<<"connection.closed">>, ?TAG, undefined), + Headers = receive_event(<<"connection.closed">>, user_key(Config), undefined), + case group_name(Config) of + amqp_0_9_1 -> + ?assert(lists:keymember(<<"client_properties">>, 1, Headers)); + amqp_1_0 -> + {table, ClientProps} = rabbit_misc:table_lookup(Headers, <<"x-opt-client-properties">>), + ?assertEqual({longstr, <<"Erlang">>}, + rabbit_misc:table_lookup(ClientProps, <<"platform">>)), + {table, Caps} = rabbit_misc:table_lookup(ClientProps, <<"capabilities">>), + ?assertEqual({bool, true}, + rabbit_misc:table_lookup(Caps, <<"basic.nack">>)), + ?assertEqual({bool, true}, + rabbit_misc:table_lookup(Caps, <<"connection.blocked">>)) + end, rabbit_ct_client_helpers:close_channel(Ch), ok. audit_direct_connection(Config) -> Ch = declare_event_queue(Config, <<"connection.*">>), - User = proplists:get_value(rmq_username, Config), Conn = rabbit_ct_client_helpers:open_unmanaged_connection_direct(Config), - receive_user_in_event(<<"connection.created">>, User), + receive_user_in_event(<<"connection.created">>, Config), rabbit_ct_client_helpers:close_connection(Conn), - receive_event(<<"connection.closed">>, ?TAG, undefined), + receive_event(<<"connection.closed">>, user_key(Config), undefined), rabbit_ct_client_helpers:close_channel(Ch), ok. audit_consumer(Config) -> Ch = declare_event_queue(Config, <<"consumer.*">>), - User = proplists:get_value(rmq_username, Config), - receive_user_in_event(<<"consumer.created">>, User), + receive_user_in_event(<<"consumer.created">>, Config), #'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, #'queue.declare'{exclusive = true}), amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, self()), CTag = receive #'basic.consume_ok'{consumer_tag = C} -> C end, - receive_user_in_event(<<"consumer.created">>, User), + receive_user_in_event(<<"consumer.created">>, Config), amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}), - receive_user_in_event(<<"consumer.deleted">>, User), + receive_user_in_event(<<"consumer.deleted">>, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -298,11 +366,10 @@ audit_exchange_internal_parameter(Config) -> #'exchange.delete_ok'{} = amqp_channel:call(Ch, #'exchange.delete'{exchange = X}), - User = proplists:get_value(rmq_username, Config), %% Exchange deletion sets and clears a runtime parameter which acts as a %% kind of lock: - receive_user_in_event(<<"parameter.set">>, User), - receive_user_in_event(<<"parameter.cleared">>, User), + receive_user_in_event(<<"parameter.set">>, Config), + receive_user_in_event(<<"parameter.cleared">>, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -315,11 +382,11 @@ audit_parameter(Config) -> ok = rabbit_ct_broker_helpers:set_parameter( Config, 0, VHost, <<"vhost-limits">>, <<"limits">>, [{<<"max-connections">>, 200}], User), - receive_user_in_event(<<"parameter.set">>, User), + receive_user_in_event(<<"parameter.set">>, User, Config), ok = rabbit_ct_broker_helpers:clear_parameter( Config, 0, VHost, <<"vhost-limits">>, <<"limits">>, User), - receive_user_in_event(<<"parameter.cleared">>, User), + receive_user_in_event(<<"parameter.cleared">>, User, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -330,10 +397,10 @@ audit_policy(Config) -> rabbit_ct_broker_helpers:set_policy(Config, 0, <<".*">>, <<"all">>, <<"queues">>, [{<<"max-length-bytes">>, 10000}], User), - receive_user_in_event(<<"policy.set">>, User), + receive_user_in_event(<<"policy.set">>, User, Config), ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<".*">>, User), - receive_user_in_event(<<"policy.cleared">>, User), + receive_user_in_event(<<"policy.cleared">>, User, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -346,11 +413,11 @@ audit_vhost_limit(Config) -> ok = rabbit_ct_broker_helpers:set_parameter( Config, 0, VHost, <<"vhost-limits">>, <<"limits">>, [{<<"max-connections">>, 200}], User), - receive_user_in_event(<<"vhost.limits.set">>, User), + receive_user_in_event(<<"vhost.limits.set">>, User, Config), ok = rabbit_ct_broker_helpers:clear_parameter( Config, 0, VHost, <<"vhost-limits">>, <<"limits">>, User), - receive_user_in_event(<<"vhost.limits.cleared">>, User), + receive_user_in_event(<<"vhost.limits.cleared">>, User, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -361,10 +428,10 @@ audit_user(Config) -> User = <<"Wabbit">>, rabbit_ct_broker_helpers:add_user(Config, 0, User, User, ActingUser), - receive_user_in_event(<<"user.created">>, ActingUser), + receive_user_in_event(<<"user.created">>, ActingUser, Config), rabbit_ct_broker_helpers:delete_user(Config, 0, User, ActingUser), - receive_user_in_event(<<"user.deleted">>, ActingUser), + receive_user_in_event(<<"user.deleted">>, ActingUser, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -376,10 +443,10 @@ audit_user_password(Config) -> rabbit_ct_broker_helpers:add_user(Config, 0, User, User, ActingUser), rabbit_ct_broker_helpers:change_password(Config, 0, User, <<"pass">>, ActingUser), - receive_user_in_event(<<"user.password.changed">>, ActingUser), + receive_user_in_event(<<"user.password.changed">>, ActingUser, Config), rabbit_ct_broker_helpers:clear_password(Config, 0, User, ActingUser), - receive_user_in_event(<<"user.password.cleared">>, ActingUser), + receive_user_in_event(<<"user.password.cleared">>, ActingUser, Config), rabbit_ct_broker_helpers:delete_user(Config, 0, User, ActingUser), rabbit_ct_client_helpers:close_channel(Ch), @@ -392,7 +459,7 @@ audit_user_tags(Config) -> rabbit_ct_broker_helpers:add_user(Config, 0, User, User, ActingUser), rabbit_ct_broker_helpers:set_user_tags(Config, 0, User, [management], ActingUser), - receive_user_in_event(<<"user.tags.set">>, ActingUser), + receive_user_in_event(<<"user.tags.set">>, ActingUser, Config), rabbit_ct_broker_helpers:delete_user(Config, 0, User, ActingUser), @@ -408,10 +475,10 @@ audit_permission(Config) -> rabbit_ct_broker_helpers:add_user(Config, 0, User, User, ActingUser), rabbit_ct_broker_helpers:set_permissions(Config, 0, User, VHost, <<".*">>, <<".*">>, <<".*">>, ActingUser), - receive_user_in_event(<<"permission.created">>, ActingUser), + receive_user_in_event(<<"permission.created">>, ActingUser, Config), rabbit_ct_broker_helpers:clear_permissions(Config, 0, User, VHost, ActingUser), - receive_user_in_event(<<"permission.deleted">>, ActingUser), + receive_user_in_event(<<"permission.deleted">>, ActingUser, Config), rabbit_ct_broker_helpers:delete_user(Config, 0, User, ActingUser), rabbit_ct_client_helpers:close_channel(Ch), @@ -427,12 +494,12 @@ audit_topic_permission(Config) -> rabbit_ct_broker_helpers:rpc( Config, 0, rabbit_auth_backend_internal, set_topic_permissions, [User, VHost, <<"amq.topic">>, "^a", "^a", ActingUser]), - receive_user_in_event(<<"topic.permission.created">>, ActingUser), + receive_user_in_event(<<"topic.permission.created">>, ActingUser, Config), rabbit_ct_broker_helpers:rpc( Config, 0, rabbit_auth_backend_internal, clear_topic_permissions, [User, VHost, ActingUser]), - receive_user_in_event(<<"topic.permission.deleted">>, ActingUser), + receive_user_in_event(<<"topic.permission.deleted">>, ActingUser, Config), rabbit_ct_broker_helpers:delete_user(Config, 0, User, ActingUser), rabbit_ct_client_helpers:close_channel(Ch), @@ -469,8 +536,8 @@ unregister(Config) -> lookup, [X])), ok. -%% Test that the event exchange works when publising and consuming via AMQP 1.0. -amqp_connection(Config) -> +%% Test the plugin publishing internally with AMQP 0.9.1 while the client uses AMQP 1.0. +amqp_0_9_1_amqp_connection(Config) -> QName = atom_to_binary(?FUNCTION_NAME), Address = rabbitmq_amqp_address:queue(QName), {Connection1, Session, LinkPair} = amqp_init(Config), @@ -498,6 +565,111 @@ amqp_connection(Config) -> ok = amqp10_client:end_session(Session), ok = amqp10_client:close_connection(Connection1). +%% Test the plugin publishing internally with AMQP 1.0 and the client using AMQP 1.0. +amqp_1_0_amqp_connection(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(QName), + {Connection1, Session, LinkPair} = amqp_init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName,#{}), + ok = rabbitmq_amqp_client:bind_queue( + LinkPair, QName, <<"amq.rabbitmq.event">>, <<"connection.*">>, #{}), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"receiver">>, Address, settled), + + Now = os:system_time(millisecond), + OpnConf0 = amqp_connection_config(Config), + OpnConf = maps:update(container_id, <<"2nd container">>, OpnConf0), + {ok, Connection2} = amqp10_client:open_connection(OpnConf), + receive {amqp10_event, {connection, Connection2, opened}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + {ok, Msg} = amqp10_client:get_msg(Receiver), + ?assertEqual(<<>>, iolist_to_binary(amqp10_msg:body(Msg))), + MsgAnns = amqp10_msg:message_annotations(Msg), + ?assertMatch(#{<<"x-routing-key">> := <<"connection.created">>, + <<"x-opt-container-id">> := <<"2nd container">>, + <<"x-opt-channel-max">> := ChannelMax} + when is_integer(ChannelMax), + MsgAnns), + %% We expect to receive event properties that have complex types. + ClientProps = maps:get(<<"x-opt-client-properties">>, MsgAnns), + OtpRelease = integer_to_binary(?OTP_RELEASE), + ?assertMatch(#{ + {symbol, <<"version">>} := {utf8, _Version}, + {symbol, <<"product">>} := {utf8, <<"AMQP 1.0 client">>}, + {symbol, <<"platform">>} := {utf8, <<"Erlang/OTP ", OtpRelease/binary>>} + }, + maps:from_list(ClientProps)), + FormattedPid = maps:get(<<"x-opt-pid">>, MsgAnns), + + %% The formatted Pid should include the RabbitMQ node name: + ?assertMatch({match, _}, + re:run(FormattedPid, <<"rmq-ct-system_SUITE">>)), + + #{creation_time := CreationTime} = amqp10_msg:properties(Msg), + ?assert(is_integer(CreationTime)), + ?assert(CreationTime > Now - 5000), + ?assert(CreationTime < Now + 5000), + + ok = amqp10_client:close_connection(Connection2), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = amqp10_client:end_session(Session), + ok = amqp10_client:close_connection(Connection1). + +%% Test that routing on specific event properties works. +headers_exchange(Config) -> + XName = <<"my headers exchange">>, + QName = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(QName), + OpnConf = amqp_connection_config(Config), + {Connection, Session, LinkPair} = amqp_init(Config), + + ok = rabbitmq_amqp_client:declare_exchange(LinkPair, XName, #{type => <<"headers">>}), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + ok = rabbitmq_amqp_client:bind_queue( + LinkPair, QName, XName, <<>>, + #{<<"x-opt-container-id">> => {utf8, <<"client-2">>}, + <<"x-match">> => {utf8, <<"any-with-x">>}}), + ok = rabbitmq_amqp_client:bind_exchange( + LinkPair, XName, <<"amq.rabbitmq.event">>, <<"connection.created">>, #{}), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"receiver">>, Address, settled), + + %% Open two connections. + OpnConf1 = maps:update(container_id, <<"client-1">>, OpnConf), + {ok, Connection1} = amqp10_client:open_connection(OpnConf1), + receive {amqp10_event, {connection, Connection1, opened}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + OpnConf2 = maps:update(container_id, <<"client-2">>, OpnConf), + {ok, Connection2} = amqp10_client:open_connection(OpnConf2), + receive {amqp10_event, {connection, Connection2, opened}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + %% Thanks to routing via headers exchange on event property + %% x-opt-container-id = client-2 + %% we should only receive the second connection.created event. + ok = amqp10_client:flow_link_credit(Receiver, 2, never, true), + receive {amqp10_msg, Receiver, Msg} -> + ?assertMatch(#{<<"x-routing-key">> := <<"connection.created">>, + <<"x-opt-container-id">> := <<"client-2">>}, + amqp10_msg:message_annotations(Msg)) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = amqp10_client:close_connection(Connection1), + ok = amqp10_client:close_connection(Connection2), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:delete_exchange(LinkPair, XName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = amqp10_client:end_session(Session), + ok = amqp10_client:close_connection(Connection). + %% ------------------------------------------------------------------- %% Helpers %% ------------------------------------------------------------------- @@ -516,17 +688,36 @@ declare_event_queue(Config, RoutingKey) -> end, Ch. -receive_user_in_event(Event, User) -> - receive_event(Event, ?TAG, {longstr, User}). +user_key(Config) -> + case group_name(Config) of + amqp_0_9_1 -> + <<"user_who_performed_action">>; + amqp_1_0 -> + <<"x-opt-user-who-performed-action">> + end. + +group_name(Config) -> + GroupProps = proplists:get_value(tc_group_properties, Config), + proplists:get_value(name, GroupProps). + +receive_user_in_event(Event, Config) -> + User = proplists:get_value(rmq_username, Config), + receive_user_in_event(Event, User, Config). + +receive_user_in_event(Event, User, Config) -> + Key = user_key(Config), + Value = {longstr, User}, + receive_event(Event, Key, Value). receive_event(Event, Key, Value) -> receive {#'basic.deliver'{routing_key = RoutingKey}, #amqp_msg{props = #'P_basic'{headers = Headers}}} -> - Event = RoutingKey, - Value = rabbit_misc:table_lookup(Headers, Key) + ?assertEqual(Event, RoutingKey), + ?assertEqual(Value, rabbit_misc:table_lookup(Headers, Key)), + Headers after - 60000 -> + 10_000 -> throw({receive_event_timeout, Event, Key, Value}) end. @@ -534,9 +725,9 @@ receive_event(Event) -> receive {#'basic.deliver'{routing_key = RoutingKey}, #amqp_msg{props = #'P_basic'{}}} -> - Event = RoutingKey + ?assertEqual(Event, RoutingKey) after - 60000 -> + 10_000 -> throw({receive_event_timeout, Event}) end. diff --git a/release-notes/4.1.0.md b/release-notes/4.1.0.md index 32ae19d73e1c..6ffd23bc853c 100644 --- a/release-notes/4.1.0.md +++ b/release-notes/4.1.0.md @@ -37,6 +37,11 @@ These metrics have already been emitted for AMQP 0.9.1 connections prior to Rabb * Session flow control state * Number of unconfirmed and unacknowledged messages +### Support publishing AMQP 1.0 messages to the Event Exchange +[PR #12714](https://github.com/rabbitmq/rabbitmq-server/pull/12714) allows the `rabbitmq_event_exchange` plugin to be configured to internally publish AMQP 1.0 instead of AMQP 0.9.1 messages to the `amq.rabbitmq.event` topic exchange. + +This feature allows AMQP 1.0 consumers to receive event properties containing complex types such as [lists](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-list) or [maps](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-map), for example queue arguments for the `queue.created` event or client provided properties for the `connection.created` event. + ### Prometheus histogram for message sizes [PR #12342](https://github.com/rabbitmq/rabbitmq-server/pull/12342) exposes a Prometheus histogram for message sizes received by RabbitMQ.