From 8931a3fdb9813b61a89e0fe8742d16d749b64197 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Wed, 2 Oct 2024 12:53:55 +0200 Subject: [PATCH] Support modifier prefix --- deps/rabbit/src/rabbit_amqp_filtex.erl | 97 ++++++++++++----- deps/rabbit/test/amqp_filtex_SUITE.erl | 138 ++++++++++++++++++++++++- 2 files changed, 210 insertions(+), 25 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqp_filtex.erl b/deps/rabbit/src/rabbit_amqp_filtex.erl index 1ae745eda265..562cf6300be7 100644 --- a/deps/rabbit/src/rabbit_amqp_filtex.erl +++ b/deps/rabbit/src/rabbit_amqp_filtex.erl @@ -13,15 +13,22 @@ -export([validate/1, filter/2]). --type filter_expression() :: {properties, [{atom(), term()}]} | - {application_properties, [{binary(), term()}]}. +-type simple_type() :: number() | binary() | atom(). +-type affix() :: {suffix, non_neg_integer(), binary()} | + {prefix, non_neg_integer(), binary()}. +-type filter_expression_value() :: simple_type() | affix(). +-type filter_expression() :: {properties, [{FieldName :: atom(), filter_expression_value()}]} | + {application_properties, [{binary(), filter_expression_value()}]}. -type filter_expressions() :: [filter_expression()]. -export_type([filter_expressions/0]). -spec validate(tuple()) -> {ok, filter_expression()} | error. validate({described, Descriptor, {map, KVList}}) -> - validate0(Descriptor, KVList); + try validate0(Descriptor, KVList) + catch throw:{?MODULE, _, _} -> + error + end; validate(_) -> error. @@ -68,6 +75,31 @@ filter0({application_properties, KVList}, Mc) -> match_simple_type(null, _Val) -> %% * The reference field value is NULL true; +match_simple_type({suffix, SuffixSize, Suffix}, Val) -> + %% * Suffix. The message metadata field matches the expression if the ordinal values of the + %% characters of the suffix expression equal the ordinal values of the same number of + %% characters trailing the message metadata field value. + case is_binary(Val) of + true -> + case Val of + <<_:(size(Val) - SuffixSize)/binary, Suffix:SuffixSize/binary>> -> + true; + _ -> + false + end; + false -> + false + end; +match_simple_type({prefix, PrefixSize, Prefix}, Val) -> + %% * Prefix. The message metadata field matches the expression if the ordinal values of the + %% characters of the prefix expression equal the ordinal values of the same number of + %% characters leading the message metadata field value. + case Val of + <> -> + true; + _ -> + false + end; match_simple_type(RefVal, Val) -> %% * the reference field value is of a floating-point or integer number type %% and the message metadata field is of a different floating-point or integer number type, @@ -84,7 +116,9 @@ validate0(Descriptor, KVList0) when (Descriptor =:= {symbol, ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER} orelse Descriptor =:= {ulong, ?DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER}) andalso KVList0 =/= [] -> - KVList = lists:map(fun({{utf8, Key}, TaggedVal}) -> + KVList = lists:map(fun({{utf8, Key}, {utf8, String}}) -> + {Key, parse_string_modifier_prefix(String)}; + ({{utf8, Key}, TaggedVal}) -> {Key, unwrap(TaggedVal)} end, KVList0), {ok, {application_properties, KVList}}; @@ -93,9 +127,9 @@ validate0(_, _) -> validate_props([], Acc) -> {ok, {properties, lists:reverse(Acc)}}; -validate_props([{{symbol, <<"message-id">>}, {Type, Val}} | Rest], Acc) -> - case validate_message_id_type(Type) of - ok -> +validate_props([{{symbol, <<"message-id">>}, TaggedVal} | Rest], Acc) -> + case parse_message_id(TaggedVal) of + {ok, Val} -> validate_props(Rest, [{message_id, Val} | Acc]); error -> error @@ -103,14 +137,14 @@ validate_props([{{symbol, <<"message-id">>}, {Type, Val}} | Rest], Acc) -> validate_props([{{symbol, <<"user-id">>}, {binary, Val}} | Rest], Acc) -> validate_props(Rest, [{user_id, Val} | Acc]); validate_props([{{symbol, <<"to">>}, {utf8, Val}} | Rest], Acc) -> - validate_props(Rest, [{to, Val} | Acc]); + validate_props(Rest, [{to, parse_string_modifier_prefix(Val)} | Acc]); validate_props([{{symbol, <<"subject">>}, {utf8, Val}} | Rest], Acc) -> - validate_props(Rest, [{subject, Val} | Acc]); + validate_props(Rest, [{subject, parse_string_modifier_prefix(Val)} | Acc]); validate_props([{{symbol, <<"reply-to">>}, {utf8, Val}} | Rest], Acc) -> - validate_props(Rest, [{reply_to, Val} | Acc]); -validate_props([{{symbol, <<"correlation-id">>}, {Type, Val}} | Rest], Acc) -> - case validate_message_id_type(Type) of - ok -> + validate_props(Rest, [{reply_to, parse_string_modifier_prefix(Val)} | Acc]); +validate_props([{{symbol, <<"correlation-id">>}, TaggedVal} | Rest], Acc) -> + case parse_message_id(TaggedVal) of + {ok, Val} -> validate_props(Rest, [{correlation_id, Val} | Acc]); error -> error @@ -124,25 +158,40 @@ validate_props([{{symbol, <<"absolute-expiry-time">>}, {timestamp, Val}} | Rest] validate_props([{{symbol, <<"creation-time">>}, {timestamp, Val}} | Rest], Acc) -> validate_props(Rest, [{creation_time, Val} | Acc]); validate_props([{{symbol, <<"group-id">>}, {utf8, Val}} | Rest], Acc) -> - validate_props(Rest, [{group_id, Val} | Acc]); + validate_props(Rest, [{group_id, parse_string_modifier_prefix(Val)} | Acc]); validate_props([{{symbol, <<"group-sequence">>}, {uint, Val}} | Rest], Acc) -> validate_props(Rest, [{group_sequence, Val} | Acc]); validate_props([{{symbol, <<"reply-to-group-id">>}, {utf8, Val}} | Rest], Acc) -> - validate_props(Rest, [{reply_to_group_id, Val} | Acc]); + validate_props(Rest, [{reply_to_group_id, parse_string_modifier_prefix(Val)} | Acc]); validate_props(_, _) -> error. -validate_message_id_type(ulong) -> - ok; -validate_message_id_type(uuid) -> - ok; -validate_message_id_type(binary) -> - ok; -validate_message_id_type(utf8) -> - ok; -validate_message_id_type(_) -> +parse_message_id({ulong, Val}) -> + {ok, Val}; +parse_message_id({uuid, Val}) -> + {ok, Val}; +parse_message_id({binary, Val}) -> + {ok, Val}; +parse_message_id({utf8, Val}) -> + {ok, parse_string_modifier_prefix(Val)}; +parse_message_id(_) -> error. +%% [filtex-v1.0-wd09 4.1.1] +parse_string_modifier_prefix(<<"$s:", Suffix/binary>>) -> + %% "Escape prefix for case-sensitive matching of a string starting with ‘&’" + {suffix, size(Suffix), Suffix}; +parse_string_modifier_prefix(<<"$p:", Prefix/binary>>) -> + %% "Escape prefix for case-sensitive matching of a string starting with ‘&’" + {prefix, size(Prefix), Prefix}; +parse_string_modifier_prefix(<<"$$", _/binary>> = String) -> + %% "Escape prefix for case-sensitive matching of a string starting with ‘&’" + string:slice(String, 1); +parse_string_modifier_prefix(<<"$", _/binary>> = String) -> + throw({?MODULE, invalid_reference_field_value, String}); +parse_string_modifier_prefix(String) -> + String. + unwrap({_Tag, V}) -> V; unwrap(V) -> diff --git a/deps/rabbit/test/amqp_filtex_SUITE.erl b/deps/rabbit/test/amqp_filtex_SUITE.erl index 5522d6baaadd..51469821a83b 100644 --- a/deps/rabbit/test/amqp_filtex_SUITE.erl +++ b/deps/rabbit/test/amqp_filtex_SUITE.erl @@ -42,7 +42,8 @@ groups() -> properties_section, application_properties_section, multiple_sections, - filter_few_messages_from_many + filter_few_messages_from_many, + string_modifier ]} ]. @@ -443,6 +444,141 @@ filter_few_messages_from_many(Config) -> ok = end_session_sync(Session), ok = close_connection_sync(Connection). +string_modifier(Config) -> + Stream = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(Stream), + {Connection, Session, LinkPair} = init(Config), + {ok, #{}} = rabbitmq_amqp_client:declare_queue( + LinkPair, + Stream, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{to => <<"abc 1">>, + reply_to => <<"abc 2">>, + subject => <<"abc 3">>, + group_id => <<"abc 4">>, + reply_to_group_id => <<"abc 5">>, + message_id => {utf8, <<"abc 6">>}, + correlation_id => <<"abc 7">>, + group_sequence => 16#ff_ff_ff_ff}, + amqp10_msg:set_application_properties( + #{<<"k1">> => <<"abc 8">>, + <<"k2">> => <<"abc 9">>}, + amqp10_msg:new(<<"t1">>, <<"m1">>)))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_application_properties( + #{<<"k1">> => <<"abc">>}, + amqp10_msg:new(<<"t2">>, <<"m2">>))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_properties( + #{subject => <<"$Hello">>, + reply_to_group_id => <<"xyz 5">>}, + amqp10_msg:new(<<"t3">>, <<"m3">>))), + + ok = wait_for_accepts(3), + ok = detach_link_sync(Sender), + flush(sent), + + PropsFilter1 = [ + {{symbol, <<"to">>}, {utf8, <<"$p:abc ">>}}, + {{symbol, <<"reply-to">>}, {utf8, <<"$p:abc">>}}, + {{symbol, <<"subject">>}, {utf8, <<"$p:ab">>}}, + {{symbol, <<"group-id">>}, {utf8, <<"$p:a">>}}, + {{symbol, <<"reply-to-group-id">>}, {utf8, <<"$s:5">>}}, + {{symbol, <<"correlation-id">>}, {utf8, <<"$s:abc 7">>}}, + {{symbol, <<"message-id">>}, {utf8, <<"$p:abc 6">>}} + ], + AppPropsFilter1 = [ + {{utf8, <<"k1">>}, {utf8, <<"$s: 8">>}}, + {{utf8, <<"k2">>}, {utf8, <<"$p:abc ">>}} + ], + Filter1 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter1}, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter1}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, Address, + settled, configuration, Filter1), + ok = amqp10_client:flow_link_credit(Receiver1, 10, never), + receive {amqp10_msg, Receiver1, R1M1} -> + ?assertEqual([<<"m1">>], amqp10_msg:body(R1M1)) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + ok = assert_no_msg_received(?LINE), + ok = detach_link_sync(Receiver1), + + %% Same filters as before except for subject which shouldn't match anymore. + PropsFilter2 = lists:keyreplace( + {symbol, <<"subject">>}, 1, PropsFilter1, + {{symbol, <<"subject">>}, {utf8, <<"$s:xxxxxxxxxxxxxx">>}}), + Filter2 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter2}, + ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER => {map, AppPropsFilter1}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, Address, + settled, configuration, Filter2), + ok = amqp10_client:flow_link_credit(Receiver2, 10, never), + ok = assert_no_msg_received(?LINE), + ok = detach_link_sync(Receiver2), + + PropsFilter3 = [{{symbol, <<"reply-to-group-id">>}, {utf8, <<"$s: 5">>}}], + Filter3 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter3}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver3} = amqp10_client:attach_receiver_link( + Session, <<"receiver 3">>, Address, + settled, configuration, Filter3), + ok = amqp10_client:flow_link_credit(Receiver3, 10, never), + receive {amqp10_msg, Receiver3, R3M1} -> + ?assertEqual([<<"m1">>], amqp10_msg:body(R3M1)) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, Receiver3, R3M3} -> + ?assertEqual([<<"m3">>], amqp10_msg:body(R3M3)) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + ok = detach_link_sync(Receiver3), + + %% '$$" is the escape prefix for case-sensitive matching of a string starting with ‘&’ + PropsFilter4 = [{{symbol, <<"subject">>}, {utf8, <<"$$Hello">>}}], + Filter4 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter4}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver4} = amqp10_client:attach_receiver_link( + Session, <<"receiver 4">>, Address, + settled, configuration, Filter4), + {ok, R4M3} = amqp10_client:get_msg(Receiver4), + ?assertEqual([<<"m3">>], amqp10_msg:body(R4M3)), + ok = detach_link_sync(Receiver4), + + %% Starting the reference field value with $ is invalid without using a valid modifier + %% prefix is invalid. + %% RabbitMQ should exclude this filter in its reply attach frame because + %% "the sending endpoint [RabbitMQ] sets the filter actually in place". + %% Hence, no filter expression is actually in place and we should receive all messages. + PropsFilter5 = [{{symbol, <<"subject">>}, {utf8, <<"$Hello">>}}], + Filter5 = #{?DESCRIPTOR_NAME_PROPERTIES_FILTER => {map, PropsFilter5}, + <<"rabbitmq:stream-offset-spec">> => <<"first">>}, + {ok, Receiver5} = amqp10_client:attach_receiver_link( + Session, <<"receiver 5">>, Address, + settled, configuration, Filter5), + {ok, R5M1} = amqp10_client:get_msg(Receiver5), + ?assertEqual([<<"m1">>], amqp10_msg:body(R5M1)), + {ok, R5M2} = amqp10_client:get_msg(Receiver5), + ?assertEqual([<<"m2">>], amqp10_msg:body(R5M2)), + {ok, R5M3} = amqp10_client:get_msg(Receiver5), + ?assertEqual([<<"m3">>], amqp10_msg:body(R5M3)), + ok = detach_link_sync(Receiver5), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = close_connection_sync(Connection). + %% ------------------------------------------------------------------- %% Helpers %% -------------------------------------------------------------------