Skip to content

Commit

Permalink
Support modifier prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
ansd committed Oct 2, 2024
1 parent ef1f241 commit 8931a3f
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 25 deletions.
97 changes: 73 additions & 24 deletions deps/rabbit/src/rabbit_amqp_filtex.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,22 @@
-export([validate/1,
filter/2]).

-type filter_expression() :: {properties, [{atom(), term()}]} |
{application_properties, [{binary(), term()}]}.
-type simple_type() :: number() | binary() | atom().
-type affix() :: {suffix, non_neg_integer(), binary()} |
{prefix, non_neg_integer(), binary()}.
-type filter_expression_value() :: simple_type() | affix().
-type filter_expression() :: {properties, [{FieldName :: atom(), filter_expression_value()}]} |
{application_properties, [{binary(), filter_expression_value()}]}.
-type filter_expressions() :: [filter_expression()].
-export_type([filter_expressions/0]).

-spec validate(tuple()) ->
{ok, filter_expression()} | error.
validate({described, Descriptor, {map, KVList}}) ->
validate0(Descriptor, KVList);
try validate0(Descriptor, KVList)
catch throw:{?MODULE, _, _} ->
error
end;
validate(_) ->
error.

Expand Down Expand Up @@ -68,6 +75,31 @@ filter0({application_properties, KVList}, Mc) ->
match_simple_type(null, _Val) ->
%% * The reference field value is NULL
true;
match_simple_type({suffix, SuffixSize, Suffix}, Val) ->
%% * Suffix. The message metadata field matches the expression if the ordinal values of the
%% characters of the suffix expression equal the ordinal values of the same number of
%% characters trailing the message metadata field value.
case is_binary(Val) of
true ->
case Val of
<<_:(size(Val) - SuffixSize)/binary, Suffix:SuffixSize/binary>> ->
true;
_ ->
false
end;
false ->
false
end;
match_simple_type({prefix, PrefixSize, Prefix}, Val) ->
%% * Prefix. The message metadata field matches the expression if the ordinal values of the
%% characters of the prefix expression equal the ordinal values of the same number of
%% characters leading the message metadata field value.
case Val of
<<Prefix:PrefixSize/binary, _/binary>> ->
true;
_ ->
false
end;
match_simple_type(RefVal, Val) ->
%% * the reference field value is of a floating-point or integer number type
%% and the message metadata field is of a different floating-point or integer number type,
Expand All @@ -84,7 +116,9 @@ validate0(Descriptor, KVList0) when
(Descriptor =:= {symbol, ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER} orelse
Descriptor =:= {ulong, ?DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER}) andalso
KVList0 =/= [] ->
KVList = lists:map(fun({{utf8, Key}, TaggedVal}) ->
KVList = lists:map(fun({{utf8, Key}, {utf8, String}}) ->
{Key, parse_string_modifier_prefix(String)};
({{utf8, Key}, TaggedVal}) ->
{Key, unwrap(TaggedVal)}
end, KVList0),
{ok, {application_properties, KVList}};
Expand All @@ -93,24 +127,24 @@ validate0(_, _) ->

validate_props([], Acc) ->
{ok, {properties, lists:reverse(Acc)}};
validate_props([{{symbol, <<"message-id">>}, {Type, Val}} | Rest], Acc) ->
case validate_message_id_type(Type) of
ok ->
validate_props([{{symbol, <<"message-id">>}, TaggedVal} | Rest], Acc) ->
case parse_message_id(TaggedVal) of
{ok, Val} ->
validate_props(Rest, [{message_id, Val} | Acc]);
error ->
error
end;
validate_props([{{symbol, <<"user-id">>}, {binary, Val}} | Rest], Acc) ->
validate_props(Rest, [{user_id, Val} | Acc]);
validate_props([{{symbol, <<"to">>}, {utf8, Val}} | Rest], Acc) ->
validate_props(Rest, [{to, Val} | Acc]);
validate_props(Rest, [{to, parse_string_modifier_prefix(Val)} | Acc]);
validate_props([{{symbol, <<"subject">>}, {utf8, Val}} | Rest], Acc) ->
validate_props(Rest, [{subject, Val} | Acc]);
validate_props(Rest, [{subject, parse_string_modifier_prefix(Val)} | Acc]);
validate_props([{{symbol, <<"reply-to">>}, {utf8, Val}} | Rest], Acc) ->
validate_props(Rest, [{reply_to, Val} | Acc]);
validate_props([{{symbol, <<"correlation-id">>}, {Type, Val}} | Rest], Acc) ->
case validate_message_id_type(Type) of
ok ->
validate_props(Rest, [{reply_to, parse_string_modifier_prefix(Val)} | Acc]);
validate_props([{{symbol, <<"correlation-id">>}, TaggedVal} | Rest], Acc) ->
case parse_message_id(TaggedVal) of
{ok, Val} ->
validate_props(Rest, [{correlation_id, Val} | Acc]);
error ->
error
Expand All @@ -124,25 +158,40 @@ validate_props([{{symbol, <<"absolute-expiry-time">>}, {timestamp, Val}} | Rest]
validate_props([{{symbol, <<"creation-time">>}, {timestamp, Val}} | Rest], Acc) ->
validate_props(Rest, [{creation_time, Val} | Acc]);
validate_props([{{symbol, <<"group-id">>}, {utf8, Val}} | Rest], Acc) ->
validate_props(Rest, [{group_id, Val} | Acc]);
validate_props(Rest, [{group_id, parse_string_modifier_prefix(Val)} | Acc]);
validate_props([{{symbol, <<"group-sequence">>}, {uint, Val}} | Rest], Acc) ->
validate_props(Rest, [{group_sequence, Val} | Acc]);
validate_props([{{symbol, <<"reply-to-group-id">>}, {utf8, Val}} | Rest], Acc) ->
validate_props(Rest, [{reply_to_group_id, Val} | Acc]);
validate_props(Rest, [{reply_to_group_id, parse_string_modifier_prefix(Val)} | Acc]);
validate_props(_, _) ->
error.

validate_message_id_type(ulong) ->
ok;
validate_message_id_type(uuid) ->
ok;
validate_message_id_type(binary) ->
ok;
validate_message_id_type(utf8) ->
ok;
validate_message_id_type(_) ->
parse_message_id({ulong, Val}) ->
{ok, Val};
parse_message_id({uuid, Val}) ->
{ok, Val};
parse_message_id({binary, Val}) ->
{ok, Val};
parse_message_id({utf8, Val}) ->
{ok, parse_string_modifier_prefix(Val)};
parse_message_id(_) ->
error.

%% [filtex-v1.0-wd09 4.1.1]
parse_string_modifier_prefix(<<"$s:", Suffix/binary>>) ->
%% "Escape prefix for case-sensitive matching of a string starting with ‘&’"
{suffix, size(Suffix), Suffix};
parse_string_modifier_prefix(<<"$p:", Prefix/binary>>) ->
%% "Escape prefix for case-sensitive matching of a string starting with ‘&’"
{prefix, size(Prefix), Prefix};
parse_string_modifier_prefix(<<"$$", _/binary>> = String) ->
%% "Escape prefix for case-sensitive matching of a string starting with ‘&’"
string:slice(String, 1);
parse_string_modifier_prefix(<<"$", _/binary>> = String) ->
throw({?MODULE, invalid_reference_field_value, String});
parse_string_modifier_prefix(String) ->
String.

unwrap({_Tag, V}) ->
V;
unwrap(V) ->
Expand Down
138 changes: 137 additions & 1 deletion deps/rabbit/test/amqp_filtex_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ groups() ->
properties_section,
application_properties_section,
multiple_sections,
filter_few_messages_from_many
filter_few_messages_from_many,
string_modifier
]}
].

Expand Down Expand Up @@ -443,6 +444,141 @@ filter_few_messages_from_many(Config) ->
ok = end_session_sync(Session),
ok = close_connection_sync(Connection).

string_modifier(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME),
Address = rabbitmq_amqp_address:queue(Stream),
{Connection, Session, LinkPair} = init(Config),
{ok, #{}} = rabbitmq_amqp_client:declare_queue(
LinkPair,
Stream,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address),
ok = wait_for_credit(Sender),

ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_properties(
#{to => <<"abc 1">>,
reply_to => <<"abc 2">>,
subject => <<"abc 3">>,
group_id => <<"abc 4">>,
reply_to_group_id => <<"abc 5">>,
message_id => {utf8, <<"abc 6">>},
correlation_id => <<"abc 7">>,
group_sequence => 16#ff_ff_ff_ff},
amqp10_msg:set_application_properties(
#{<<"k1">> => <<"abc 8">>,
<<"k2">> => <<"abc 9">>},
amqp10_msg:new(<<"t1">>, <<"m1">>)))),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_application_properties(
#{<<"k1">> => <<"abc">>},
amqp10_msg:new(<<"t2">>, <<"m2">>))),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_properties(
#{subject => <<"$Hello">>,
reply_to_group_id => <<"xyz 5">>},
amqp10_msg:new(<<"t3">>, <<"m3">>))),

ok = wait_for_accepts(3),
ok = detach_link_sync(Sender),
flush(sent),

PropsFilter1 = [
{{symbol, <<"to">>}, {utf8, <<"$p:abc ">>}},
{{symbol, <<"reply-to">>}, {utf8, <<"$p:abc">>}},
{{symbol, <<"subject">>}, {utf8, <<"$p:ab">>}},
{{symbol, <<"group-id">>}, {utf8, <<"$p:a">>}},
{{symbol, <<"reply-to-group-id">>}, {utf8, <<"$s:5">>}},
{{symbol, <<"correlation-id">>}, {utf8, <<"$s:abc 7">>}},
{{symbol, <<"message-id">>}, {utf8, <<"$p:abc 6">>}}
],
AppPropsFilter1 = [
{{utf8, <<"k1">>}, {utf8, <<"$s: 8">>}},
{{utf8, <<"k2">>}, {utf8, <<"$p:abc ">>}}
],
Filter1 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter1},
?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter1},
<<"rabbitmq:stream-offset-spec">> => <<"first">>},
{ok, Receiver1} = amqp10_client:attach_receiver_link(
Session, <<"receiver 1">>, Address,
settled, configuration, Filter1),
ok = amqp10_client:flow_link_credit(Receiver1, 10, never),
receive {amqp10_msg, Receiver1, R1M1} ->
?assertEqual([<<"m1">>], amqp10_msg:body(R1M1))
after 5000 -> ct:fail({missing_msg, ?LINE})
end,
ok = assert_no_msg_received(?LINE),
ok = detach_link_sync(Receiver1),

%% Same filters as before except for subject which shouldn't match anymore.
PropsFilter2 = lists:keyreplace(
{symbol, <<"subject">>}, 1, PropsFilter1,
{{symbol, <<"subject">>}, {utf8, <<"$s:xxxxxxxxxxxxxx">>}}),
Filter2 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter2},
?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter1},
<<"rabbitmq:stream-offset-spec">> => <<"first">>},
{ok, Receiver2} = amqp10_client:attach_receiver_link(
Session, <<"receiver 2">>, Address,
settled, configuration, Filter2),
ok = amqp10_client:flow_link_credit(Receiver2, 10, never),
ok = assert_no_msg_received(?LINE),
ok = detach_link_sync(Receiver2),

PropsFilter3 = [{{symbol, <<"reply-to-group-id">>}, {utf8, <<"$s: 5">>}}],
Filter3 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter3},
<<"rabbitmq:stream-offset-spec">> => <<"first">>},
{ok, Receiver3} = amqp10_client:attach_receiver_link(
Session, <<"receiver 3">>, Address,
settled, configuration, Filter3),
ok = amqp10_client:flow_link_credit(Receiver3, 10, never),
receive {amqp10_msg, Receiver3, R3M1} ->
?assertEqual([<<"m1">>], amqp10_msg:body(R3M1))
after 5000 -> ct:fail({missing_msg, ?LINE})
end,
receive {amqp10_msg, Receiver3, R3M3} ->
?assertEqual([<<"m3">>], amqp10_msg:body(R3M3))
after 5000 -> ct:fail({missing_msg, ?LINE})
end,
ok = detach_link_sync(Receiver3),

%% '$$" is the escape prefix for case-sensitive matching of a string starting with ‘&’
PropsFilter4 = [{{symbol, <<"subject">>}, {utf8, <<"$$Hello">>}}],
Filter4 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter4},
<<"rabbitmq:stream-offset-spec">> => <<"first">>},
{ok, Receiver4} = amqp10_client:attach_receiver_link(
Session, <<"receiver 4">>, Address,
settled, configuration, Filter4),
{ok, R4M3} = amqp10_client:get_msg(Receiver4),
?assertEqual([<<"m3">>], amqp10_msg:body(R4M3)),
ok = detach_link_sync(Receiver4),

%% Starting the reference field value with $ is invalid without using a valid modifier
%% prefix is invalid.
%% RabbitMQ should exclude this filter in its reply attach frame because
%% "the sending endpoint [RabbitMQ] sets the filter actually in place".
%% Hence, no filter expression is actually in place and we should receive all messages.
PropsFilter5 = [{{symbol, <<"subject">>}, {utf8, <<"$Hello">>}}],
Filter5 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter5},
<<"rabbitmq:stream-offset-spec">> => <<"first">>},
{ok, Receiver5} = amqp10_client:attach_receiver_link(
Session, <<"receiver 5">>, Address,
settled, configuration, Filter5),
{ok, R5M1} = amqp10_client:get_msg(Receiver5),
?assertEqual([<<"m1">>], amqp10_msg:body(R5M1)),
{ok, R5M2} = amqp10_client:get_msg(Receiver5),
?assertEqual([<<"m2">>], amqp10_msg:body(R5M2)),
{ok, R5M3} = amqp10_client:get_msg(Receiver5),
?assertEqual([<<"m3">>], amqp10_msg:body(R5M3)),
ok = detach_link_sync(Receiver5),

{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = end_session_sync(Session),
ok = close_connection_sync(Connection).

%% -------------------------------------------------------------------
%% Helpers
%% -------------------------------------------------------------------
Expand Down

0 comments on commit 8931a3f

Please sign in to comment.