From 26068a0adad0108293447ca67fec5aa27a4327ac Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 20 Feb 2024 11:22:21 +0100 Subject: [PATCH] Protect receiving app from being overloaded What? Protect receiving application from being overloaded with new messages while still processing existing messages if the auto credit renewal feature of the Erlang AMQP 1.0 client library is used. This feature can therefore be thought of as a prefetch window equivalent in AMQP 0.9.1 or MQTT 5.0 property Receive Maximum. How? The credit auto renewal feature in RabbitMQ 3.x was wrongly implemented. This commit takes the same approach as done in the server: The incoming_unsettled map is hold in the link instead of in the session to accurately and quickly determine the number of unsettled messages for a receiving link. The amqp10_client lib will grant more credits to the sender when the sum of remaining link credits and number of unsettled deliveries falls below the threshold RenewWhenBelow. This avoids maintaning additional state like the `link_credit_unsettled` or an alternative delivery_count_settled sequence number which is more complex to implement correctly. This commit breaks the amqp10_client_session:disposition/6 API: This commit forces the client application to only range settle for a given link, i.e. not across multiple links on a given session at once. The latter is allowed according to the AMQP spec. --- deps/amqp10_client/Makefile | 3 - deps/amqp10_client/src/amqp10_client.erl | 34 +-- deps/amqp10_client/src/amqp10_client.hrl | 1 + .../src/amqp10_client_session.erl | 115 +++++----- deps/amqp10_client/test/system_SUITE.erl | 210 ++++++++++++++---- deps/rabbit/test/amqp_client_SUITE.erl | 60 +++-- deps/rabbit/test/amqp_credit_api_v2_SUITE.erl | 21 +- .../src/rabbit_amqp10_shovel.erl | 24 +- .../test/protocol_interop_SUITE.erl | 6 +- moduleindex.yaml | 34 +-- 10 files changed, 315 insertions(+), 193 deletions(-) diff --git a/deps/amqp10_client/Makefile b/deps/amqp10_client/Makefile index 7dd0f4c657a5..b42ca3017e45 100644 --- a/deps/amqp10_client/Makefile +++ b/deps/amqp10_client/Makefile @@ -51,9 +51,6 @@ include erlang.mk HEX_TARBALL_FILES += rabbitmq-components.mk \ git-revisions.txt -# Dialyze the tests. -DIALYZER_OPTS += --src -r test - # -------------------------------------------------------------------- # ActiveMQ for the testsuite. # -------------------------------------------------------------------- diff --git a/deps/amqp10_client/src/amqp10_client.erl b/deps/amqp10_client/src/amqp10_client.erl index c9939bc263e4..32f91a5f7aea 100644 --- a/deps/amqp10_client/src/amqp10_client.erl +++ b/deps/amqp10_client/src/amqp10_client.erl @@ -301,16 +301,19 @@ attach_link(Session, AttachArgs) -> %% This is asynchronous and will notify completion of the attach request to the %% caller using an amqp10_event of the following format: %% {amqp10_event, {link, LinkRef, {detached, Why}}} --spec detach_link(link_ref()) -> _. +-spec detach_link(link_ref()) -> ok | {error, term()}. detach_link(#link_ref{link_handle = Handle, session = Session}) -> amqp10_client_session:detach(Session, Handle). -%% @doc Grant credit to a sender. -%% The amqp10_client will automatically grant Credit to the sender when -%% the remaining link credit falls below the value of RenewWhenBelow. -%% If RenewWhenBelow is 'never' the client will never grant more credit. Instead -%% the caller will be notified when the link_credit reaches 0 with an -%% amqp10_event of the following format: +%% @doc Grant Credit to a sender. +%% +%% In addition, if RenewWhenBelow is an integer, the amqp10_client will automatically grant more +%% Credit to the sender when the sum of the remaining link credit and the number of unsettled +%% messages falls below the value of RenewWhenBelow. +%% `Credit + RenewWhenBelow - 1` is the maximum number of in-flight unsettled messages. +%% +%% If RenewWhenBelow is `never` the amqp10_client will never grant more credit. Instead the caller +%% will be notified when the link_credit reaches 0 with an amqp10_event of the following format: %% {amqp10_event, {link, LinkRef, credit_exhausted}} -spec flow_link_credit(link_ref(), Credit :: non_neg_integer(), RenewWhenBelow :: never | pos_integer()) -> ok. @@ -323,10 +326,16 @@ flow_link_credit(Ref, Credit, RenewWhenBelow) -> flow_link_credit(#link_ref{role = receiver, session = Session, link_handle = Handle}, Credit, RenewWhenBelow, Drain) - when RenewWhenBelow =:= never orelse + when + %% Drain together with auto renewal doesn't make sense, so disallow it in the API. + ((Drain) andalso RenewWhenBelow =:= never + orelse not(Drain)) + andalso + %% Check that the RenewWhenBelow value make sense. + (RenewWhenBelow =:= never orelse is_integer(RenewWhenBelow) andalso RenewWhenBelow > 0 andalso - RenewWhenBelow =< Credit -> + RenewWhenBelow =< Credit) -> Flow = #'v1_0.flow'{link_credit = {uint, Credit}, drain = Drain}, ok = amqp10_client_session:flow(Session, Handle, Flow, RenewWhenBelow). @@ -359,11 +368,10 @@ accept_msg(LinkRef, Msg) -> %% the chosen delivery state. -spec settle_msg(link_ref(), amqp10_msg:amqp10_msg(), amqp10_client_types:delivery_state()) -> ok. -settle_msg(#link_ref{role = receiver, - session = Session}, Msg, Settlement) -> +settle_msg(LinkRef, Msg, Settlement) -> DeliveryId = amqp10_msg:delivery_id(Msg), - amqp10_client_session:disposition(Session, receiver, DeliveryId, - DeliveryId, true, Settlement). + amqp10_client_session:disposition(LinkRef, DeliveryId, DeliveryId, true, Settlement). + %% @doc Get a single message from a link. %% Flows a single link credit then awaits delivery or timeout. -spec get_msg(link_ref()) -> {ok, amqp10_msg:amqp10_msg()} | {error, timeout}. diff --git a/deps/amqp10_client/src/amqp10_client.hrl b/deps/amqp10_client/src/amqp10_client.hrl index 137e82552199..99cad7578300 100644 --- a/deps/amqp10_client/src/amqp10_client.hrl +++ b/deps/amqp10_client/src/amqp10_client.hrl @@ -20,4 +20,5 @@ -record(link_ref, {role :: sender | receiver, session :: pid(), + %% locally chosen output handle link_handle :: non_neg_integer()}). diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index e728f4f5ce05..7b1cba641d76 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -21,7 +21,7 @@ detach/2, transfer/3, flow/4, - disposition/6 + disposition/5 ]). %% Private API @@ -131,8 +131,9 @@ available = 0 :: non_neg_integer(), drain = false :: boolean(), partial_transfers :: undefined | {#'v1_0.transfer'{}, [binary()]}, - auto_flow :: never | {auto, RenewWhenBelow :: pos_integer(), Credit :: pos_integer()} - }). + auto_flow :: never | {auto, RenewWhenBelow :: pos_integer(), Credit :: pos_integer()}, + incoming_unsettled = #{} :: #{delivery_number() => ok} + }). -record(state, {channel :: pos_integer(), @@ -155,7 +156,6 @@ connection_config :: amqp10_client_connection:connection_config(), outgoing_delivery_id = ?INITIAL_OUTGOING_DELIVERY_ID :: delivery_number(), outgoing_unsettled = #{} :: #{delivery_number() => {amqp10_msg:delivery_tag(), Notify :: pid()}}, - incoming_unsettled = #{} :: #{delivery_number() => output_handle()}, notify :: pid() }). @@ -204,14 +204,18 @@ transfer(Session, Amqp10Msg, Timeout) -> flow(Session, Handle, Flow, RenewWhenBelow) -> gen_statem:cast(Session, {flow_link, Handle, Flow, RenewWhenBelow}). --spec disposition(pid(), link_role(), delivery_number(), delivery_number(), boolean(), +%% Sending a disposition on a sender link (with receiver-settle-mode = second) +%% is currently unsupported. +-spec disposition(link_ref(), delivery_number(), delivery_number(), boolean(), amqp10_client_types:delivery_state()) -> ok. -disposition(Session, Role, First, Last, Settled, DeliveryState) -> - gen_statem:call(Session, {disposition, Role, First, Last, Settled, +disposition(#link_ref{role = receiver, + session = Session, + link_handle = Handle}, + First, Last, Settled, DeliveryState) -> + gen_statem:call(Session, {disposition, Handle, First, Last, Settled, DeliveryState}, ?TIMEOUT). - %% ------------------------------------------------------------------- %% Private API. %% ------------------------------------------------------------------- @@ -277,7 +281,7 @@ mapped(cast, 'end', State) -> send_end(State), {next_state, end_sent, State}; mapped(cast, {flow_link, OutHandle, Flow0, RenewWhenBelow}, State0) -> - State = send_flow_link(fun send/2, OutHandle, Flow0, RenewWhenBelow, State0), + State = send_flow_link(OutHandle, Flow0, RenewWhenBelow, State0), {keep_state, State}; mapped(cast, {flow_session, Flow0 = #'v1_0.flow'{incoming_window = {uint, IncomingWindow}}}, #state{next_incoming_id = NII, @@ -367,45 +371,43 @@ mapped(cast, {#'v1_0.transfer'{handle = {uint, InHandle}, State = book_partial_transfer_received( State0#state{links = Links#{OutHandle => Link1}}), {keep_state, State}; -mapped(cast, {#'v1_0.transfer'{handle = {uint, InHandle}, - delivery_id = MaybeDeliveryId, - settled = Settled} = Transfer0, Payload0}, - #state{incoming_unsettled = Unsettled0} = State0) -> - +mapped(cast, {Transfer0 = #'v1_0.transfer'{handle = {uint, InHandle}}, + Payload0}, State0) -> {ok, #link{target = {pid, TargetPid}, - output_handle = OutHandle, - ref = LinkRef} = Link0} = - find_link_by_input_handle(InHandle, State0), + ref = LinkRef, + incoming_unsettled = Unsettled + } = Link0} = find_link_by_input_handle(InHandle, State0), - {Transfer, Payload, Link1} = complete_partial_transfer(Transfer0, Payload0, Link0), - Msg = decode_as_msg(Transfer, Payload), - - % stash the DeliveryId - not sure for what yet - Unsettled = case MaybeDeliveryId of - {uint, DeliveryId} when Settled =/= true -> - Unsettled0#{DeliveryId => OutHandle}; - _ -> - Unsettled0 - end, + {Transfer = #'v1_0.transfer'{settled = Settled, + delivery_id = {uint, DeliveryId}}, + Payload, Link1} = complete_partial_transfer(Transfer0, Payload0, Link0), + Msg = decode_as_msg(Transfer, Payload), + Link2 = case Settled of + true -> + Link1; + _ -> + %% "If not set on the first (or only) transfer for a (multi-transfer) delivery, + %% then the settled flag MUST be interpreted as being false." [2.7.5] + Link1#link{incoming_unsettled = Unsettled#{DeliveryId => ok}} + end, % link bookkeeping % notify when credit is exhausted (link_credit = 0) % detach the Link with a transfer-limit-exceeded error code if further % transfers are received - State1 = State0#state{incoming_unsettled = Unsettled}, - case book_transfer_received(State1, Link1) of - {ok, Link2, State2} -> + case book_transfer_received(State0, Link2) of + {ok, Link3, State1} -> % deliver TargetPid ! {amqp10_msg, LinkRef, Msg}, - State = auto_flow(Link2, State2), + State = auto_flow(Link3, State1), {keep_state, State}; - {credit_exhausted, Link2, State} -> + {credit_exhausted, Link3, State} -> TargetPid ! {amqp10_msg, LinkRef, Msg}, - notify_credit_exhausted(Link2), + notify_credit_exhausted(Link3), {keep_state, State}; - {transfer_limit_exceeded, Link2, State} -> - logger:warning("transfer_limit_exceeded for link ~tp", [Link2]), - Link = detach_with_error_cond(Link2, State, + {transfer_limit_exceeded, Link3, State} -> + logger:warning("transfer_limit_exceeded for link ~tp", [Link3]), + Link = detach_with_error_cond(Link3, State, ?V_1_0_LINK_ERROR_TRANSFER_LIMIT_EXCEEDED), {keep_state, update_link(Link, State)} end; @@ -501,12 +503,15 @@ mapped({call, From}, end; mapped({call, From}, - {disposition, Role, First, Last, Settled0, DeliveryState}, - #state{incoming_unsettled = Unsettled0} = State0) -> + {disposition, OutputHandle, First, Last, Settled0, DeliveryState}, + #state{links = Links} = State0) -> + #{OutputHandle := Link0 = #link{incoming_unsettled = Unsettled0}} = Links, Unsettled = serial_number:foldl(fun maps:remove/2, Unsettled0, First, Last), - State = State0#state{incoming_unsettled = Unsettled}, + Link = Link0#link{incoming_unsettled = Unsettled}, + State1 = State0#state{links = Links#{OutputHandle := Link}}, + State = auto_flow(Link, State1), Disposition = #'v1_0.disposition'{ - role = translate_role(Role), + role = translate_role(receiver), first = {uint, First}, last = {uint, Last}, settled = Settled0, @@ -599,7 +604,7 @@ send_transfer(Transfer0, Parts0, MaxMessageSize, #state{socket = Socket, {ok, length(Frames)} end. -send_flow_link(Send, OutHandle, +send_flow_link(OutHandle, #'v1_0.flow'{link_credit = {uint, Credit}} = Flow0, RenewWhenBelow, #state{links = Links, next_incoming_id = NII, @@ -625,7 +630,7 @@ send_flow_link(Send, OutHandle, %% initial attach frame from the sender this field MUST NOT be set." [2.7.4] delivery_count = maybe_uint(DeliveryCount), available = uint(Available)}, - ok = Send(Flow, State), + ok = send(Flow, State), State#state{links = Links#{OutHandle => Link#link{link_credit = Credit, auto_flow = AutoFlow}}}. @@ -777,8 +782,9 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _}, max_message_size = MaxMessageSize}, ok = Send(Attach, State), + LinkRef = make_link_ref(element(1, Role), self(), OutHandle), Link = #link{name = Name, - ref = make_link_ref(element(1, Role), self(), OutHandle), + ref = LinkRef, output_handle = OutHandle, state = attach_sent, role = element(1, Role), @@ -790,7 +796,7 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _}, {State#state{links = Links#{OutHandle => Link}, next_link_handle = NextLinkHandle, - link_index = LinkIndex#{Name => OutHandle}}, Link#link.ref}. + link_index = LinkIndex#{Name => OutHandle}}, LinkRef}. -spec handle_session_flow(#'v1_0.flow'{}, #state{}) -> #state{}. handle_session_flow(#'v1_0.flow'{next_incoming_id = MaybeNII, @@ -908,7 +914,6 @@ translate_delivery_state({modified, translate_delivery_state(released) -> #'v1_0.released'{}; translate_delivery_state(received) -> #'v1_0.received'{}. -translate_role(sender) -> false; translate_role(receiver) -> true. maybe_notify_link_credit(#link{role = sender, @@ -987,9 +992,11 @@ book_transfer_received(#state{next_incoming_id = NID, auto_flow(#link{link_credit = LC, auto_flow = {auto, RenewWhenBelow, Credit}, - output_handle = OutHandle}, State) - when LC < RenewWhenBelow -> - send_flow_link(fun send/2, OutHandle, + output_handle = OutHandle, + incoming_unsettled = Unsettled}, + State) + when LC + map_size(Unsettled) < RenewWhenBelow -> + send_flow_link(OutHandle, #'v1_0.flow'{link_credit = {uint, Credit}}, RenewWhenBelow, State); auto_flow(_, State) -> @@ -1045,7 +1052,8 @@ socket_send0({tcp, Socket}, Data) -> socket_send0({ssl, Socket}, Data) -> ssl:send(Socket, Data). --spec make_link_ref(_, _, _) -> link_ref(). +-spec make_link_ref(link_role(), pid(), output_handle()) -> + link_ref(). make_link_ref(Role, Session, Handle) -> #link_ref{role = Role, session = Session, link_handle = Handle}. @@ -1100,7 +1108,6 @@ format_status(Status = #{data := Data0}) -> connection_config = ConnectionConfig, outgoing_delivery_id = OutgoingDeliveryId, outgoing_unsettled = OutgoingUnsettled, - incoming_unsettled = IncomingUnsettled, notify = Notify } = Data0, Links = maps:map( @@ -1119,7 +1126,8 @@ format_status(Status = #{data := Data0}) -> available = Available, drain = Drain, partial_transfers = PartialTransfers0, - auto_flow = AutoFlow + auto_flow = AutoFlow, + incoming_unsettled = IncomingUnsettled }) -> PartialTransfers = case PartialTransfers0 of undefined -> @@ -1141,7 +1149,9 @@ format_status(Status = #{data := Data0}) -> available => Available, drain => Drain, partial_transfers => PartialTransfers, - auto_flow => AutoFlow} + auto_flow => AutoFlow, + incoming_unsettled => maps:size(IncomingUnsettled) + } end, Links0), Data = #{channel => Channel, remote_channel => RemoteChannel, @@ -1160,7 +1170,6 @@ format_status(Status = #{data := Data0}) -> connection_config => maps:remove(sasl, ConnectionConfig), outgoing_delivery_id => OutgoingDeliveryId, outgoing_unsettled => maps:size(OutgoingUnsettled), - incoming_unsettled => maps:size(IncomingUnsettled), notify => Notify}, Status#{data := Data}. diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl index 2147b0f156ce..62a7718657ef 100644 --- a/deps/amqp10_client/test/system_SUITE.erl +++ b/deps/amqp10_client/test/system_SUITE.erl @@ -514,16 +514,17 @@ subscribe(Config) -> <<"sub-receiver">>, QueueName, unsettled), ok = amqp10_client:flow_link_credit(Receiver, 10, never), - - _ = receive_messages(Receiver, 10), - % assert no further messages are delivered - timeout = receive_one(Receiver), - receive - {amqp10_event, {link, Receiver, credit_exhausted}} -> - ok - after 5000 -> - flush(), - exit(credit_exhausted_assert) + [begin + receive {amqp10_msg, Receiver, Msg} -> + ok = amqp10_client:accept_msg(Receiver, Msg) + after 2000 -> ct:fail(timeout) + end + end || _ <- lists:seq(1, 10)], + ok = assert_no_message(Receiver), + + receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok + after 5000 -> flush(), + exit(credit_exhausted_assert) end, ok = amqp10_client:end_session(Session), @@ -539,16 +540,121 @@ subscribe_with_auto_flow(Config) -> <<"sub-sender">>, QueueName), await_link(Sender, credited, link_credit_timeout), - _ = publish_messages(Sender, <<"banana">>, 10), - {ok, Receiver} = amqp10_client:attach_receiver_link(Session, - <<"sub-receiver">>, - QueueName, unsettled), - ok = amqp10_client:flow_link_credit(Receiver, 5, 2), - - _ = receive_messages(Receiver, 10), - % assert no further messages are delivered - timeout = receive_one(Receiver), + _ = publish_messages(Sender, <<"banana">>, 20), + %% Use sender settle mode 'settled'. + {ok, R1} = amqp10_client:attach_receiver_link( + Session, <<"sub-receiver-1">>, QueueName, settled), + await_link(R1, attached, attached_timeout), + ok = amqp10_client:flow_link_credit(R1, 5, 2), + ?assertEqual(20, count_received_messages(R1)), + ok = amqp10_client:detach_link(R1), + + _ = publish_messages(Sender, <<"banana">>, 30), + %% Use sender settle mode 'unsettled'. + %% This should require us to manually settle message in order to receive more messages. + {ok, R2} = amqp10_client:attach_receiver_link(Session, <<"sub-receiver-2">>, QueueName, unsettled), + await_link(R2, attached, attached_timeout), + ok = amqp10_client:flow_link_credit(R2, 5, 2), + %% We should receive exactly 5 messages. + [M1, _M2, M3, M4, M5] = receive_messages(R2, 5), + ok = assert_no_message(R2), + + %% Even when we accept the first 3 messages, the number of unsettled messages has not yet fallen below 2. + %% Therefore, the client should not yet grant more credits to the sender. + ok = amqp10_client_session:disposition( + R2, amqp10_msg:delivery_id(M1), amqp10_msg:delivery_id(M3), true, accepted), + ok = assert_no_message(R2), + + %% When we accept 1 more message (the order in which we accept shouldn't matter, here we accept M5 before M4), + %% the number of unsettled messages now falls below 2 (since only M4 is left unsettled). + %% Therefore, the client should grant 5 credits to the sender. + %% Therefore, we should receive 5 more messages. + ok = amqp10_client:accept_msg(R2, M5), + [_M6, _M7, _M8, _M9, M10] = receive_messages(R2, 5), + ok = assert_no_message(R2), + + %% It shouldn't matter how we settle messages, therefore we use 'rejected' this time. + %% Settling all in flight messages should cause us to receive exactly 5 more messages. + ok = amqp10_client_session:disposition( + R2, amqp10_msg:delivery_id(M4), amqp10_msg:delivery_id(M10), true, rejected), + [M11, _M12, _M13, _M14, M15] = receive_messages(R2, 5), + ok = assert_no_message(R2), + + %% Dynamically decrease link credit. + %% Since we explicitly tell to grant 3 new credits now, we expect to receive 3 more messages. + ok = amqp10_client:flow_link_credit(R2, 3, 3), + [M16, _M17, M18] = receive_messages(R2, 3), + ok = assert_no_message(R2), + + ok = amqp10_client_session:disposition( + R2, amqp10_msg:delivery_id(M11), amqp10_msg:delivery_id(M15), true, accepted), + %% However, the RenewWhenBelow=3 still refers to all unsettled messages. + %% Right now we have 3 messages (M16, M17, M18) unsettled. + ok = assert_no_message(R2), + + %% Settling 1 out of these 3 messages causes RenewWhenBelow to fall below 3 resulting + %% in 3 new messages to be received. + ok = amqp10_client:accept_msg(R2, M18), + [_M19, _M20, _M21] = receive_messages(R2, 3), + ok = assert_no_message(R2), + + ok = amqp10_client:flow_link_credit(R2, 3, never, true), + [_M22, _M23, M24] = receive_messages(R2, 3), + ok = assert_no_message(R2), + + %% Since RenewWhenBelow = never, we expect to receive no new messages despite settling. + ok = amqp10_client_session:disposition( + R2, amqp10_msg:delivery_id(M16), amqp10_msg:delivery_id(M24), true, rejected), + ok = assert_no_message(R2), + + ok = amqp10_client:flow_link_credit(R2, 2, never, false), + [M25, _M26] = receive_messages(R2, 2), + ok = assert_no_message(R2), + + ok = amqp10_client:flow_link_credit(R2, 3, 3), + [_M27, _M28, M29] = receive_messages(R2, 3), + ok = assert_no_message(R2), + + ok = amqp10_client_session:disposition( + R2, amqp10_msg:delivery_id(M25), amqp10_msg:delivery_id(M29), true, accepted), + [M30] = receive_messages(R2, 1), + ok = assert_no_message(R2), + ok = amqp10_client:accept_msg(R2, M30), + %% The sender queue is empty now. + ok = assert_no_message(R2), + + ok = amqp10_client:flow_link_credit(R2, 3, 1), + _ = publish_messages(Sender, <<"banana">>, 1), + [M31] = receive_messages(R2, 1), + ok = amqp10_client:accept_msg(R2, M31), + + %% Since function flow_link_credit/3 documents + %% "if RenewWhenBelow is an integer, the amqp10_client will automatically grant more + %% Credit to the sender when the sum of the remaining link credit and the number of + %% unsettled messages falls below the value of RenewWhenBelow." + %% our expectation is that the amqp10_client has not renewed credit since the sum of + %% remaining link credit (2) and unsettled messages (0) is 2. + %% + %% Therefore, when we publish another 3 messages, we expect to only receive only 2 messages! + _ = publish_messages(Sender, <<"banana">>, 5), + [M32, M33] = receive_messages(R2, 2), + ok = assert_no_message(R2), + + %% When we accept both messages, the sum of the remaining link credit (0) and unsettled messages (0) + %% falls below RenewWhenBelow=1 causing the amqp10_client to grant 3 new credits. + ok = amqp10_client:accept_msg(R2, M32), + ok = assert_no_message(R2), + ok = amqp10_client:accept_msg(R2, M33), + + [M35, M36, M37] = receive_messages(R2, 3), + ok = amqp10_client:accept_msg(R2, M35), + ok = amqp10_client:accept_msg(R2, M36), + ok = amqp10_client:accept_msg(R2, M37), + %% The sender queue is empty now. + ok = assert_no_message(R2), + + ok = amqp10_client:detach_link(R2), ok = amqp10_client:end_session(Session), ok = amqp10_client:close_connection(Connection). @@ -703,11 +809,19 @@ incoming_heartbeat(Config) -> %%% HELPERS %%% -receive_messages(Receiver, Num) -> - [begin - ct:pal("receive_messages ~tp", [T]), - ok = receive_one(Receiver) - end || T <- lists:seq(1, Num)]. +await_link(Who, What, Err) -> + receive + {amqp10_event, {link, Who0, What0}} + when Who0 =:= Who andalso + What0 =:= What -> + ok; + {amqp10_event, {link, Who0, {detached, Why}}} + when Who0 =:= Who -> + exit(Why) + after 5000 -> + flush(), + exit(Err) + end. publish_messages(Sender, Data, Num) -> [begin @@ -717,36 +831,42 @@ publish_messages(Sender, Data, Num) -> ok = await_disposition(Tag) end || T <- lists:seq(1, Num)]. -receive_one(Receiver) -> - receive - {amqp10_msg, Receiver0, Msg} - when Receiver0 =:= Receiver -> - amqp10_client:accept_msg(Receiver, Msg) - after 2000 -> - timeout - end. - await_disposition(DeliveryTag) -> receive {amqp10_disposition, {accepted, DeliveryTag0}} when DeliveryTag0 =:= DeliveryTag -> ok after 3000 -> flush(), - exit(dispostion_timeout) + ct:fail(dispostion_timeout) end. -await_link(Who, What, Err) -> +count_received_messages(Receiver) -> + count_received_messages0(Receiver, 0). + +count_received_messages0(Receiver, Count) -> receive - {amqp10_event, {link, Who0, What0}} - when Who0 =:= Who andalso - What0 =:= What -> - ok; - {amqp10_event, {link, Who0, {detached, Why}}} - when Who0 =:= Who -> - exit(Why) - after 5000 -> - flush(), - exit(Err) + {amqp10_msg, Receiver, _Msg} -> + count_received_messages0(Receiver, Count + 1) + after 200 -> + Count + end. + +receive_messages(Receiver, N) -> + receive_messages0(Receiver, N, []). + +receive_messages0(_Receiver, 0, Acc) -> + lists:reverse(Acc); +receive_messages0(Receiver, N, Acc) -> + receive + {amqp10_msg, Receiver, Msg} -> + receive_messages0(Receiver, N - 1, [Msg | Acc]) + after 5000 -> + ct:fail({timeout, {num_received, length(Acc)}, {num_missing, N}}) + end. + +assert_no_message(Receiver) -> + receive {amqp10_msg, Receiver, Msg} -> ct:fail({unexpected_message, Msg}) + after 50 -> ok end. to_bin(X) when is_list(X) -> diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 884f54033a9f..c450ca06e346 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -492,7 +492,7 @@ receiver_settle_mode_first(Config) -> ?assertEqual(DeliveryIdMsg9, serial_number_increment(DeliveryIdMsg8)), Last1 = serial_number_increment(serial_number_increment(DeliveryIdMsg9)), ok = amqp10_client_session:disposition( - Session, receiver, DeliveryIdMsg8, Last1, true, accepted), + Receiver, DeliveryIdMsg8, Last1, true, accepted), assert_messages(QName, 8, 7, Config), %% 2. Ack a range smaller than the number of unacked messages where all delivery IDs @@ -501,7 +501,7 @@ receiver_settle_mode_first(Config) -> DeliveryIdMsg4 = amqp10_msg:delivery_id(Msg4), DeliveryIdMsg6 = amqp10_msg:delivery_id(Msg6), ok = amqp10_client_session:disposition( - Session, receiver, DeliveryIdMsg4, DeliveryIdMsg6, true, accepted), + Receiver, DeliveryIdMsg4, DeliveryIdMsg6, true, accepted), assert_messages(QName, 5, 4, Config), %% 3. Ack a range larger than the number of unacked messages where all delivery IDs @@ -509,7 +509,7 @@ receiver_settle_mode_first(Config) -> DeliveryIdMsg2 = amqp10_msg:delivery_id(Msg2), DeliveryIdMsg7 = amqp10_msg:delivery_id(Msg7), ok = amqp10_client_session:disposition( - Session, receiver, DeliveryIdMsg2, DeliveryIdMsg7, true, accepted), + Receiver, DeliveryIdMsg2, DeliveryIdMsg7, true, accepted), assert_messages(QName, 2, 1, Config), %% Consume the last message. @@ -523,16 +523,16 @@ receiver_settle_mode_first(Config) -> DeliveryIdMsg10 = amqp10_msg:delivery_id(Msg10), Last2 = serial_number_increment(DeliveryIdMsg10), ok = amqp10_client_session:disposition( - Session, receiver, DeliveryIdMsg1, Last2, true, accepted), + Receiver, DeliveryIdMsg1, Last2, true, accepted), assert_messages(QName, 0, 0, Config), %% 5. Ack single delivery ID when there are no unacked messages. ok = amqp10_client_session:disposition( - Session, receiver, DeliveryIdMsg1, DeliveryIdMsg1, true, accepted), + Receiver, DeliveryIdMsg1, DeliveryIdMsg1, true, accepted), %% 6. Ack multiple delivery IDs when there are no unacked messages. ok = amqp10_client_session:disposition( - Session, receiver, DeliveryIdMsg1, DeliveryIdMsg6, true, accepted), + Receiver, DeliveryIdMsg1, DeliveryIdMsg6, true, accepted), assert_messages(QName, 0, 0, Config), ok = amqp10_client:detach_link(Receiver), @@ -684,7 +684,7 @@ amqp_stream_amqpl(Config) -> #amqp_msg{props = #'P_basic'{type = <<"amqp-1.0">>}}} -> ok after 5000 -> - exit(basic_deliver_timeout) + ct:fail(basic_deliver_timeout) end, #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_channel(Ch). @@ -1736,7 +1736,7 @@ detach_requeues(Config) -> %% Receiver2 accepts all 4 messages. ok = amqp10_client_session:disposition( - Session, receiver, + Receiver2, amqp10_msg:delivery_id(Msg2), amqp10_msg:delivery_id(Msg3b), true, accepted), @@ -2328,29 +2328,20 @@ async_notify(SenderSettleMode, QType, Config) -> end, %% Initially, grant 10 credits to the sending queue. - %% Whenever credits drops below 5, renew back to 10. + %% Whenever the sum of credits and number of unsettled messages drops below 5, renew back to 10. ok = amqp10_client:flow_link_credit(Receiver, 10, 5), %% We should receive all messages. - Msgs = receive_messages(Receiver, NumMsgs), + Accept = case SenderSettleMode of + settled -> false; + unsettled -> true + end, + Msgs = receive_all_messages(Receiver, Accept), FirstMsg = hd(Msgs), LastMsg = lists:last(Msgs), ?assertEqual([<<"1">>], amqp10_msg:body(FirstMsg)), ?assertEqual([integer_to_binary(NumMsgs)], amqp10_msg:body(LastMsg)), - case SenderSettleMode of - settled -> - ok; - unsettled -> - ok = amqp10_client_session:disposition( - Session, - receiver, - amqp10_msg:delivery_id(FirstMsg), - amqp10_msg:delivery_id(LastMsg), - true, - accepted) - end, - %% No further messages should be delivered. receive Unexpected -> ct:fail({received_unexpected_message, Unexpected}) after 50 -> ok @@ -2503,8 +2494,7 @@ queue_and_client_different_nodes(QueueLeaderNode, ClientNode, QueueType, Config) ?assertEqual([<<"1">>], amqp10_msg:body(FirstMsg)), ?assertEqual([integer_to_binary(NumMsgs)], amqp10_msg:body(LastMsg)), ok = amqp10_client_session:disposition( - Session, - receiver, + Receiver, amqp10_msg:delivery_id(FirstMsg), amqp10_msg:delivery_id(LastMsg), true, @@ -2803,7 +2793,7 @@ stream_filtering(Config) -> #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, <<"rabbitmq:stream-filter">> => <<"apple">>}), ok = amqp10_client:flow_link_credit(AppleReceiver, 100, 10), - AppleMessages = receive_all_messages(AppleReceiver, []), + AppleMessages = receive_all_messages(AppleReceiver, true), %% we should get less than all the waves combined ?assert(length(AppleMessages) < WaveCount * 3), %% client-side filtering @@ -2824,7 +2814,7 @@ stream_filtering(Config) -> #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, <<"rabbitmq:stream-filter">> => [<<"apple">>, <<"orange">>]}), ok = amqp10_client:flow_link_credit(AppleOrangeReceiver, 100, 10), - AppleOrangeMessages = receive_all_messages(AppleOrangeReceiver, []), + AppleOrangeMessages = receive_all_messages(AppleOrangeReceiver, true), %% we should get less than all the waves combined ?assert(length(AppleOrangeMessages) < WaveCount * 3), %% client-side filtering @@ -2848,7 +2838,7 @@ stream_filtering(Config) -> <<"rabbitmq:stream-match-unfiltered">> => {boolean, true}}), ok = amqp10_client:flow_link_credit(AppleUnfilteredReceiver, 100, 10), - AppleUnfilteredMessages = receive_all_messages(AppleUnfilteredReceiver, []), + AppleUnfilteredMessages = receive_all_messages(AppleUnfilteredReceiver, true), %% we should get less than all the waves combined ?assert(length(AppleUnfilteredMessages) < WaveCount * 3), %% client-side filtering @@ -3351,10 +3341,16 @@ classic_priority_queue(Config) -> %% internal %% -receive_all_messages(Receiver, Acc) -> +receive_all_messages(Receiver, Accept) -> + receive_all_messages0(Receiver, Accept, []). + +receive_all_messages0(Receiver, Accept, Acc) -> receive {amqp10_msg, Receiver, Msg} -> - ok = amqp10_client:accept_msg(Receiver, Msg), - receive_all_messages(Receiver, [Msg | Acc]) + case Accept of + true -> ok = amqp10_client:accept_msg(Receiver, Msg); + false -> ok + end, + receive_all_messages0(Receiver, Accept, [Msg | Acc]) after 500 -> lists:reverse(Acc) end. @@ -3501,7 +3497,7 @@ receive_messages0(Receiver, N, Acc) -> {amqp10_msg, Receiver, Msg} -> receive_messages0(Receiver, N - 1, [Msg | Acc]) after 5000 -> - exit({timeout, {num_received, length(Acc)}, {num_missing, N}}) + ct:fail({timeout, {num_received, length(Acc)}, {num_missing, N}}) end. count_received_messages(Receiver) -> diff --git a/deps/rabbit/test/amqp_credit_api_v2_SUITE.erl b/deps/rabbit/test/amqp_credit_api_v2_SUITE.erl index b9f053db6ce9..fb46a7def31f 100644 --- a/deps/rabbit/test/amqp_credit_api_v2_SUITE.erl +++ b/deps/rabbit/test/amqp_credit_api_v2_SUITE.erl @@ -121,8 +121,8 @@ credit_api_v2(Config) -> filter => #{}}, {ok, QQReceiver1} = amqp10_client:attach_link(Session, QQAttachArgs), - ok = consume_and_accept(10, CQReceiver1, Session), - ok = consume_and_accept(10, QQReceiver1, Session), + ok = consume_and_accept(10, CQReceiver1), + ok = consume_and_accept(10, QQReceiver1), ?assertEqual(ok, rabbit_ct_broker_helpers:enable_feature_flag(Config, ?FUNCTION_NAME)), @@ -133,12 +133,12 @@ credit_api_v2(Config) -> Session, <<"cq receiver 2">>, CQAddr, unsettled), {ok, QQReceiver2} = amqp10_client:attach_receiver_link( Session, <<"qq receiver 2">>, QQAddr, unsettled), - ok = consume_and_accept(10, CQReceiver2, Session), - ok = consume_and_accept(10, QQReceiver2, Session), + ok = consume_and_accept(10, CQReceiver2), + ok = consume_and_accept(10, QQReceiver2), %% Consume via with credit API v1 - ok = consume_and_accept(10, CQReceiver1, Session), - ok = consume_and_accept(10, QQReceiver1, Session), + ok = consume_and_accept(10, CQReceiver1), + ok = consume_and_accept(10, QQReceiver1), %% Detach the credit API v1 links and attach with the same output handle. ok = detach_sync(CQReceiver1), @@ -147,8 +147,8 @@ credit_api_v2(Config) -> {ok, QQReceiver3} = amqp10_client:attach_link(Session, QQAttachArgs), %% The new links should use credit API v2 - ok = consume_and_accept(10, CQReceiver3, Session), - ok = consume_and_accept(10, QQReceiver3, Session), + ok = consume_and_accept(10, CQReceiver3), + ok = consume_and_accept(10, QQReceiver3), flush(pre_drain), %% Draining should also work. @@ -181,12 +181,11 @@ credit_api_v2(Config) -> after 5000 -> ct:fail(missing_closed) end. -consume_and_accept(NumMsgs, Receiver, Session) -> +consume_and_accept(NumMsgs, Receiver) -> ok = amqp10_client:flow_link_credit(Receiver, NumMsgs, never), Msgs = receive_messages(Receiver, NumMsgs), ok = amqp10_client_session:disposition( - Session, - receiver, + Receiver, amqp10_msg:delivery_id(hd(Msgs)), amqp10_msg:delivery_id(lists:last(Msgs)), true, diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl index 147626de7f72..492fd535d959 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl @@ -283,30 +283,24 @@ close_dest(#{dest := #{current := #{conn := Conn}}}) -> close_dest(_Config) -> ok. -spec ack(Tag :: tag(), Multi :: boolean(), state()) -> state(). -ack(Tag, true, State = #{source := #{current := #{session := Session}, +ack(Tag, true, State = #{source := #{current := #{link := LinkRef}, last_acked_tag := LastTag} = Src}) -> First = LastTag + 1, - ok = amqp10_client_session:disposition(Session, receiver, First, - Tag, true, accepted), + ok = amqp10_client_session:disposition(LinkRef, First, Tag, true, accepted), State#{source => Src#{last_acked_tag => Tag}}; -ack(Tag, false, State = #{source := #{current := - #{session := Session}} = Src}) -> - ok = amqp10_client_session:disposition(Session, receiver, Tag, - Tag, true, accepted), +ack(Tag, false, State = #{source := #{current := #{link := LinkRef}} = Src}) -> + ok = amqp10_client_session:disposition(LinkRef, Tag, Tag, true, accepted), State#{source => Src#{last_acked_tag => Tag}}. -spec nack(Tag :: tag(), Multi :: boolean(), state()) -> state(). -nack(Tag, false, State = #{source := - #{current := #{session := Session}} = Src}) -> +nack(Tag, false, State = #{source := #{current := #{link := LinkRef}} = Src}) -> % the tag is the same as the deliveryid - ok = amqp10_client_session:disposition(Session, receiver, Tag, - Tag, false, rejected), + ok = amqp10_client_session:disposition(LinkRef, Tag, Tag, true, rejected), State#{source => Src#{last_nacked_tag => Tag}}; -nack(Tag, true, State = #{source := #{current := #{session := Session}, - last_nacked_tag := LastTag} = Src}) -> +nack(Tag, true, State = #{source := #{current := #{link := LinkRef}, + last_nacked_tag := LastTag} = Src}) -> First = LastTag + 1, - ok = amqp10_client_session:disposition(Session, receiver, First, - Tag, true, accepted), + ok = amqp10_client_session:disposition(LinkRef, First, Tag, true, rejected), State#{source => Src#{last_nacked_tag => Tag}}. status(#{dest := #{current := #{link_state := attached}}}) -> diff --git a/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl b/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl index 3ae1fbe603a5..6a5dd4151d5a 100644 --- a/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl +++ b/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl @@ -178,8 +178,7 @@ amqp_credit_multiple_grants(Config) -> %% Let's ack all of them. ok = amqp10_client_session:disposition( - Session, - receiver, + Receiver, amqp10_msg:delivery_id(M1), amqp10_msg:delivery_id(M4), true, @@ -226,8 +225,7 @@ amqp_credit_multiple_grants(Config) -> %% Let's ack them all. ok = amqp10_client_session:disposition( - Session, - receiver, + Receiver, amqp10_msg:delivery_id(M5), amqp10_msg:delivery_id(M11), true, diff --git a/moduleindex.yaml b/moduleindex.yaml index 7b632d7213a9..2c0451c094f8 100755 --- a/moduleindex.yaml +++ b/moduleindex.yaml @@ -3,6 +3,23 @@ accept: - accept_header - accept_neg - accept_parser +amqp10_client: +- amqp10_client +- amqp10_client_app +- amqp10_client_connection +- amqp10_client_connection_sup +- amqp10_client_frame_reader +- amqp10_client_session +- amqp10_client_sessions_sup +- amqp10_client_sup +- amqp10_client_types +- amqp10_msg +amqp10_common: +- amqp10_binary_generator +- amqp10_binary_parser +- amqp10_framing +- amqp10_framing0 +- serial_number amqp_client: - amqp_auth_mechanisms - amqp_channel @@ -28,23 +45,6 @@ amqp_client: - amqp_util - rabbit_routing_util - uri_parser -amqp10_client: -- amqp10_client -- amqp10_client_app -- amqp10_client_connection -- amqp10_client_connection_sup -- amqp10_client_frame_reader -- amqp10_client_session -- amqp10_client_sessions_sup -- amqp10_client_sup -- amqp10_client_types -- amqp10_msg -amqp10_common: -- amqp10_binary_generator -- amqp10_binary_parser -- amqp10_framing -- amqp10_framing0 -- serial_number aten: - aten - aten_app