Skip to content

Commit

Permalink
Merge pull request #12714 from rabbitmq/amqp-event-exchange
Browse files Browse the repository at this point in the history
Support publishing AMQP 1.0 to Event Exchange
  • Loading branch information
michaelklishin authored Nov 14, 2024
2 parents 89b15cc + de804d1 commit 3e509c9
Show file tree
Hide file tree
Showing 10 changed files with 554 additions and 376 deletions.
11 changes: 2 additions & 9 deletions deps/rabbit/src/mc_amqpl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
-define(AMQP10_FOOTER, <<"x-amqp-1.0-footer">>).
-define(PROTOMOD, rabbit_framing_amqp_0_9_1).
-define(CLASS_ID, 60).
-define(LONGSTR_UTF8_LIMIT, 4096).

-opaque state() :: #content{}.

Expand Down Expand Up @@ -682,19 +681,13 @@ wrap(_Type, undefined) ->
wrap(Type, Val) ->
{Type, Val}.

from_091(longstr, V)
when is_binary(V) andalso
byte_size(V) =< ?LONGSTR_UTF8_LIMIT ->
%% if a longstr is longer than 4096 bytes we just assume it is binary
%% it _may_ still be valid utf8 but checking this for every longstr header
%% value is going to be excessively slow
case mc_util:is_utf8_no_null(V) of
from_091(longstr, V) ->
case mc_util:is_utf8_no_null_limited(V) of
true ->
{utf8, V};
false ->
{binary, V}
end;
from_091(longstr, V) -> {binary, V};
from_091(long, V) -> {long, V};
from_091(unsignedbyte, V) -> {ubyte, V};
from_091(short, V) -> {short, V};
Expand Down
13 changes: 13 additions & 0 deletions deps/rabbit/src/mc_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
-include("mc.hrl").

-export([is_valid_shortstr/1,
is_utf8_no_null_limited/1,
is_utf8_no_null/1,
uuid_to_urn_string/1,
urn_string_to_uuid/1,
Expand All @@ -12,12 +13,24 @@
is_x_header/1
]).

-define(UTF8_SCAN_LIMIT, 4096).

-spec is_valid_shortstr(term()) -> boolean().
is_valid_shortstr(Bin) when ?IS_SHORTSTR_LEN(Bin) ->
is_utf8_no_null(Bin);
is_valid_shortstr(_) ->
false.

-spec is_utf8_no_null_limited(term()) -> boolean().
is_utf8_no_null_limited(Bin)
when byte_size(Bin) =< ?UTF8_SCAN_LIMIT ->
is_utf8_no_null(Bin);
is_utf8_no_null_limited(_Term) ->
%% If longer than 4096 bytes, just assume it's not UTF-8.
%% It _may_ still be valid UTF-8 but checking this
%% on the hot path is going to be excessively slow.
false.

-spec is_utf8_no_null(term()) -> boolean().
is_utf8_no_null(Term) ->
utf8_scan(Term, fun (C) -> C > 0 end).
Expand Down
62 changes: 34 additions & 28 deletions deps/rabbit/test/mc_unit_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -313,34 +313,37 @@ amqpl_amqp_bin_amqpl(_Config) ->
%% incoming amqpl converted to amqp, serialized / deserialized then converted
%% back to amqpl.
%% simulates a legacy message published then consumed to a stream
Props = #'P_basic'{content_type = <<"text/plain">>,
content_encoding = <<"gzip">>,
headers = [{<<"a-stream-offset">>, long, 99},
{<<"a-string">>, longstr, <<"a string">>},
{<<"a-bool">>, bool, false},
{<<"a-unsignedbyte">>, unsignedbyte, 1},
{<<"a-unsignedshort">>, unsignedshort, 1},
{<<"a-unsignedint">>, unsignedint, 1},
{<<"a-signedint">>, signedint, 1},
{<<"a-timestamp">>, timestamp, 1},
{<<"a-double">>, double, 1.0},
{<<"a-float">>, float, 1.0},
{<<"a-void">>, void, undefined},
{<<"a-binary">>, binary, <<"data">>},
{<<"a-array">>, array, [{long, 1}, {long, 2}]},
{<<"x-stream-filter">>, longstr, <<"apple">>}
],
delivery_mode = 2,
priority = 98,
correlation_id = <<"corr">> ,
reply_to = <<"reply-to">>,
expiration = <<"1">>,
message_id = <<"msg-id">>,
timestamp = 99,
type = <<"45">>,
user_id = <<"banana">>,
app_id = <<"rmq">>
},
String5k = binary:copy(<<"x">>, 5000),
Props = #'P_basic'{
content_type = <<"text/plain">>,
content_encoding = <<"gzip">>,
headers = [{<<"a-stream-offset">>, long, 99},
{<<"a-string">>, longstr, <<"a string">>},
{<<"a-very-long-string">>, longstr, String5k},
{<<"a-bool">>, bool, false},
{<<"a-unsignedbyte">>, unsignedbyte, 1},
{<<"a-unsignedshort">>, unsignedshort, 1},
{<<"a-unsignedint">>, unsignedint, 1},
{<<"a-signedint">>, signedint, 1},
{<<"a-timestamp">>, timestamp, 1},
{<<"a-double">>, double, 1.0},
{<<"a-float">>, float, 1.0},
{<<"a-void">>, void, undefined},
{<<"a-binary">>, binary, <<"data">>},
{<<"a-array">>, array, [{long, 1}, {long, 2}]},
{<<"x-stream-filter">>, longstr, <<"apple">>}
],
delivery_mode = 2,
priority = 98,
correlation_id = <<"corr">> ,
reply_to = <<"reply-to">>,
expiration = <<"1">>,
message_id = <<"msg-id">>,
timestamp = 99,
type = <<"45">>,
user_id = <<"banana">>,
app_id = <<"rmq">>
},
Content = #content{properties = Props,
payload_fragments_rev = [<<"data">>]},
Msg = mc:init(mc_amqpl, Content, annotations()),
Expand Down Expand Up @@ -404,6 +407,9 @@ amqpl_amqp_bin_amqpl(_Config) ->

?assertEqual({long, 99}, Get(<<"a-stream-offset">>, AP10)),
?assertEqual({utf8, <<"a string">>}, Get(<<"a-string">>, AP10)),
%% We expect that a very long string is not scanned for valid UTF-8
%% and instead directly turned into a binary.
?assertEqual({binary, String5k}, Get(<<"a-very-long-string">>, AP10)),
?assertEqual(false, Get(<<"a-bool">>, AP10)),
?assertEqual({ubyte, 1}, Get(<<"a-unsignedbyte">>, AP10)),
?assertEqual({ushort, 1}, Get(<<"a-unsignedshort">>, AP10)),
Expand Down
6 changes: 6 additions & 0 deletions deps/rabbitmq_event_exchange/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
PROJECT = rabbitmq_event_exchange
PROJECT_DESCRIPTION = Event Exchange Type

define PROJECT_ENV
[
{protocol, amqp_0_9_1}
]
endef

define PROJECT_APP_EXTRA_KEYS
{broker_version_requirements, []}
endef
Expand Down
151 changes: 2 additions & 149 deletions deps/rabbitmq_event_exchange/README.md
Original file line number Diff line number Diff line change
@@ -1,154 +1,7 @@
# RabbitMQ Event Exchange

## Overview

This plugin exposes the internal RabbitMQ event mechanism as messages that clients
can consume. It's useful
if you want to keep track of certain events, e.g. when queues, exchanges, bindings, users,
connections, channels are created and deleted. This plugin filters out stats
events, so you are almost certainly going to get better results using
the management plugin for stats.

## How it Works

It declares a topic exchange called `amq.rabbitmq.event` **in the default
virtual host**. All events are published to this exchange with routing
keys like 'exchange.created', 'binding.deleted' etc, so you can
subscribe to only the events you're interested in.

The exchange behaves similarly to 'amq.rabbitmq.log': everything gets
published there; if you don't trust a user with the information that
gets published, don't allow them access.


## Installation

This plugin ships with RabbitMQ. Like with all other plugins, it must be
enabled before it can be used:

```bash
[sudo] rabbitmq-plugins enable rabbitmq_event_exchange
```

## Event format

Each event has various properties associated with it. These are
translated into AMQP 0-9-1 data encoding and inserted in the message headers. The
**message body is always blank**.

## Events

So far RabbitMQ and related plugins emit events with the following routing keys:

### RabbitMQ Broker

Queue, Exchange and Binding events:

* `queue.deleted`
* `queue.created`
* `exchange.created`
* `exchange.deleted`
* `binding.created`
* `binding.deleted`

Connection and Channel events:

* `connection.created`
* `connection.closed`
* `channel.created`
* `channel.closed`

Consumer events:

* `consumer.created`
* `consumer.deleted`

Policy and Parameter events:

* `policy.set`
* `policy.cleared`
* `parameter.set`
* `parameter.cleared`

Virtual host events:

* `vhost.created`
* `vhost.deleted`
* `vhost.limits.set`
* `vhost.limits.cleared`

User related events:

* `user.authentication.success`
* `user.authentication.failure`
* `user.created`
* `user.deleted`
* `user.password.changed`
* `user.password.cleared`
* `user.tags.set`

Permission events:

* `permission.created`
* `permission.deleted`
* `topic.permission.created`
* `topic.permission.deleted`

Alarm events:

* `alarm.set`
* `alarm.cleared`

### Shovel Plugin

Worker events:

* `shovel.worker.status`
* `shovel.worker.removed`

### Federation Plugin

Link events:

* `federation.link.status`
* `federation.link.removed`

## Example

There is a usage example using the Java client in `examples/java`.


## Configuration

* `rabbitmq_event_exchange.vhost`: what vhost should the `amq.rabbitmq.event` exchange be declared in. Default: `rabbit.default_vhost` (`<<"/">>`).


## Uninstalling

If you want to remove the exchange which this plugin creates, first
disable the plugin and restart the broker. Then you can delete the exchange,
e.g. with :

rabbitmqctl eval 'rabbit_exchange:delete(rabbit_misc:r(<<"/">>, exchange, <<"amq.rabbitmq.event">>), false, <<"username">>).'


## Building from Source

Building is no different from [building other RabbitMQ plugins](https://www.rabbitmq.com/plugin-development.html).

TL;DR:

git clone https://gh.neting.cc.com/rabbitmq/rabbitmq-public-umbrella.git umbrella
cd umbrella
make co
make up BRANCH=stable
cd deps
git clone https://github.com/rabbitmq/rabbitmq-event-exchange.git rabbitmq_event_exchange
cd rabbitmq_event_exchange
make dist

See the [website](https://www.rabbitmq.com/docs/event-exchange) for documentation.

## License

Released under the Mozilla Public License 2.0,
the same as RabbitMQ.
Released under the Mozilla Public License 2.0, the same as RabbitMQ.
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@
fun(Conf) ->
list_to_binary(cuttlefish:conf_get("event_exchange.vhost", Conf))
end}.

{mapping, "event_exchange.protocol", "rabbitmq_event_exchange.protocol", [
{datatype, {enum, [amqp_0_9_1, amqp_1_0]}}
]}.
Loading

0 comments on commit 3e509c9

Please sign in to comment.