diff --git a/deps/amqp10_client/Makefile b/deps/amqp10_client/Makefile index 7dd0f4c657a5..b42ca3017e45 100644 --- a/deps/amqp10_client/Makefile +++ b/deps/amqp10_client/Makefile @@ -51,9 +51,6 @@ include erlang.mk HEX_TARBALL_FILES += rabbitmq-components.mk \ git-revisions.txt -# Dialyze the tests. -DIALYZER_OPTS += --src -r test - # -------------------------------------------------------------------- # ActiveMQ for the testsuite. # -------------------------------------------------------------------- diff --git a/deps/amqp10_client/src/amqp10_client.erl b/deps/amqp10_client/src/amqp10_client.erl index c9939bc263e4..32f91a5f7aea 100644 --- a/deps/amqp10_client/src/amqp10_client.erl +++ b/deps/amqp10_client/src/amqp10_client.erl @@ -301,16 +301,19 @@ attach_link(Session, AttachArgs) -> %% This is asynchronous and will notify completion of the attach request to the %% caller using an amqp10_event of the following format: %% {amqp10_event, {link, LinkRef, {detached, Why}}} --spec detach_link(link_ref()) -> _. +-spec detach_link(link_ref()) -> ok | {error, term()}. detach_link(#link_ref{link_handle = Handle, session = Session}) -> amqp10_client_session:detach(Session, Handle). -%% @doc Grant credit to a sender. -%% The amqp10_client will automatically grant Credit to the sender when -%% the remaining link credit falls below the value of RenewWhenBelow. -%% If RenewWhenBelow is 'never' the client will never grant more credit. Instead -%% the caller will be notified when the link_credit reaches 0 with an -%% amqp10_event of the following format: +%% @doc Grant Credit to a sender. +%% +%% In addition, if RenewWhenBelow is an integer, the amqp10_client will automatically grant more +%% Credit to the sender when the sum of the remaining link credit and the number of unsettled +%% messages falls below the value of RenewWhenBelow. +%% `Credit + RenewWhenBelow - 1` is the maximum number of in-flight unsettled messages. +%% +%% If RenewWhenBelow is `never` the amqp10_client will never grant more credit. Instead the caller +%% will be notified when the link_credit reaches 0 with an amqp10_event of the following format: %% {amqp10_event, {link, LinkRef, credit_exhausted}} -spec flow_link_credit(link_ref(), Credit :: non_neg_integer(), RenewWhenBelow :: never | pos_integer()) -> ok. @@ -323,10 +326,16 @@ flow_link_credit(Ref, Credit, RenewWhenBelow) -> flow_link_credit(#link_ref{role = receiver, session = Session, link_handle = Handle}, Credit, RenewWhenBelow, Drain) - when RenewWhenBelow =:= never orelse + when + %% Drain together with auto renewal doesn't make sense, so disallow it in the API. + ((Drain) andalso RenewWhenBelow =:= never + orelse not(Drain)) + andalso + %% Check that the RenewWhenBelow value make sense. + (RenewWhenBelow =:= never orelse is_integer(RenewWhenBelow) andalso RenewWhenBelow > 0 andalso - RenewWhenBelow =< Credit -> + RenewWhenBelow =< Credit) -> Flow = #'v1_0.flow'{link_credit = {uint, Credit}, drain = Drain}, ok = amqp10_client_session:flow(Session, Handle, Flow, RenewWhenBelow). @@ -359,11 +368,10 @@ accept_msg(LinkRef, Msg) -> %% the chosen delivery state. -spec settle_msg(link_ref(), amqp10_msg:amqp10_msg(), amqp10_client_types:delivery_state()) -> ok. -settle_msg(#link_ref{role = receiver, - session = Session}, Msg, Settlement) -> +settle_msg(LinkRef, Msg, Settlement) -> DeliveryId = amqp10_msg:delivery_id(Msg), - amqp10_client_session:disposition(Session, receiver, DeliveryId, - DeliveryId, true, Settlement). + amqp10_client_session:disposition(LinkRef, DeliveryId, DeliveryId, true, Settlement). + %% @doc Get a single message from a link. %% Flows a single link credit then awaits delivery or timeout. -spec get_msg(link_ref()) -> {ok, amqp10_msg:amqp10_msg()} | {error, timeout}. diff --git a/deps/amqp10_client/src/amqp10_client.hrl b/deps/amqp10_client/src/amqp10_client.hrl index 137e82552199..99cad7578300 100644 --- a/deps/amqp10_client/src/amqp10_client.hrl +++ b/deps/amqp10_client/src/amqp10_client.hrl @@ -20,4 +20,5 @@ -record(link_ref, {role :: sender | receiver, session :: pid(), + %% locally chosen output handle link_handle :: non_neg_integer()}). diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index e728f4f5ce05..7b1cba641d76 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -21,7 +21,7 @@ detach/2, transfer/3, flow/4, - disposition/6 + disposition/5 ]). %% Private API @@ -131,8 +131,9 @@ available = 0 :: non_neg_integer(), drain = false :: boolean(), partial_transfers :: undefined | {#'v1_0.transfer'{}, [binary()]}, - auto_flow :: never | {auto, RenewWhenBelow :: pos_integer(), Credit :: pos_integer()} - }). + auto_flow :: never | {auto, RenewWhenBelow :: pos_integer(), Credit :: pos_integer()}, + incoming_unsettled = #{} :: #{delivery_number() => ok} + }). -record(state, {channel :: pos_integer(), @@ -155,7 +156,6 @@ connection_config :: amqp10_client_connection:connection_config(), outgoing_delivery_id = ?INITIAL_OUTGOING_DELIVERY_ID :: delivery_number(), outgoing_unsettled = #{} :: #{delivery_number() => {amqp10_msg:delivery_tag(), Notify :: pid()}}, - incoming_unsettled = #{} :: #{delivery_number() => output_handle()}, notify :: pid() }). @@ -204,14 +204,18 @@ transfer(Session, Amqp10Msg, Timeout) -> flow(Session, Handle, Flow, RenewWhenBelow) -> gen_statem:cast(Session, {flow_link, Handle, Flow, RenewWhenBelow}). --spec disposition(pid(), link_role(), delivery_number(), delivery_number(), boolean(), +%% Sending a disposition on a sender link (with receiver-settle-mode = second) +%% is currently unsupported. +-spec disposition(link_ref(), delivery_number(), delivery_number(), boolean(), amqp10_client_types:delivery_state()) -> ok. -disposition(Session, Role, First, Last, Settled, DeliveryState) -> - gen_statem:call(Session, {disposition, Role, First, Last, Settled, +disposition(#link_ref{role = receiver, + session = Session, + link_handle = Handle}, + First, Last, Settled, DeliveryState) -> + gen_statem:call(Session, {disposition, Handle, First, Last, Settled, DeliveryState}, ?TIMEOUT). - %% ------------------------------------------------------------------- %% Private API. %% ------------------------------------------------------------------- @@ -277,7 +281,7 @@ mapped(cast, 'end', State) -> send_end(State), {next_state, end_sent, State}; mapped(cast, {flow_link, OutHandle, Flow0, RenewWhenBelow}, State0) -> - State = send_flow_link(fun send/2, OutHandle, Flow0, RenewWhenBelow, State0), + State = send_flow_link(OutHandle, Flow0, RenewWhenBelow, State0), {keep_state, State}; mapped(cast, {flow_session, Flow0 = #'v1_0.flow'{incoming_window = {uint, IncomingWindow}}}, #state{next_incoming_id = NII, @@ -367,45 +371,43 @@ mapped(cast, {#'v1_0.transfer'{handle = {uint, InHandle}, State = book_partial_transfer_received( State0#state{links = Links#{OutHandle => Link1}}), {keep_state, State}; -mapped(cast, {#'v1_0.transfer'{handle = {uint, InHandle}, - delivery_id = MaybeDeliveryId, - settled = Settled} = Transfer0, Payload0}, - #state{incoming_unsettled = Unsettled0} = State0) -> - +mapped(cast, {Transfer0 = #'v1_0.transfer'{handle = {uint, InHandle}}, + Payload0}, State0) -> {ok, #link{target = {pid, TargetPid}, - output_handle = OutHandle, - ref = LinkRef} = Link0} = - find_link_by_input_handle(InHandle, State0), + ref = LinkRef, + incoming_unsettled = Unsettled + } = Link0} = find_link_by_input_handle(InHandle, State0), - {Transfer, Payload, Link1} = complete_partial_transfer(Transfer0, Payload0, Link0), - Msg = decode_as_msg(Transfer, Payload), - - % stash the DeliveryId - not sure for what yet - Unsettled = case MaybeDeliveryId of - {uint, DeliveryId} when Settled =/= true -> - Unsettled0#{DeliveryId => OutHandle}; - _ -> - Unsettled0 - end, + {Transfer = #'v1_0.transfer'{settled = Settled, + delivery_id = {uint, DeliveryId}}, + Payload, Link1} = complete_partial_transfer(Transfer0, Payload0, Link0), + Msg = decode_as_msg(Transfer, Payload), + Link2 = case Settled of + true -> + Link1; + _ -> + %% "If not set on the first (or only) transfer for a (multi-transfer) delivery, + %% then the settled flag MUST be interpreted as being false." [2.7.5] + Link1#link{incoming_unsettled = Unsettled#{DeliveryId => ok}} + end, % link bookkeeping % notify when credit is exhausted (link_credit = 0) % detach the Link with a transfer-limit-exceeded error code if further % transfers are received - State1 = State0#state{incoming_unsettled = Unsettled}, - case book_transfer_received(State1, Link1) of - {ok, Link2, State2} -> + case book_transfer_received(State0, Link2) of + {ok, Link3, State1} -> % deliver TargetPid ! {amqp10_msg, LinkRef, Msg}, - State = auto_flow(Link2, State2), + State = auto_flow(Link3, State1), {keep_state, State}; - {credit_exhausted, Link2, State} -> + {credit_exhausted, Link3, State} -> TargetPid ! {amqp10_msg, LinkRef, Msg}, - notify_credit_exhausted(Link2), + notify_credit_exhausted(Link3), {keep_state, State}; - {transfer_limit_exceeded, Link2, State} -> - logger:warning("transfer_limit_exceeded for link ~tp", [Link2]), - Link = detach_with_error_cond(Link2, State, + {transfer_limit_exceeded, Link3, State} -> + logger:warning("transfer_limit_exceeded for link ~tp", [Link3]), + Link = detach_with_error_cond(Link3, State, ?V_1_0_LINK_ERROR_TRANSFER_LIMIT_EXCEEDED), {keep_state, update_link(Link, State)} end; @@ -501,12 +503,15 @@ mapped({call, From}, end; mapped({call, From}, - {disposition, Role, First, Last, Settled0, DeliveryState}, - #state{incoming_unsettled = Unsettled0} = State0) -> + {disposition, OutputHandle, First, Last, Settled0, DeliveryState}, + #state{links = Links} = State0) -> + #{OutputHandle := Link0 = #link{incoming_unsettled = Unsettled0}} = Links, Unsettled = serial_number:foldl(fun maps:remove/2, Unsettled0, First, Last), - State = State0#state{incoming_unsettled = Unsettled}, + Link = Link0#link{incoming_unsettled = Unsettled}, + State1 = State0#state{links = Links#{OutputHandle := Link}}, + State = auto_flow(Link, State1), Disposition = #'v1_0.disposition'{ - role = translate_role(Role), + role = translate_role(receiver), first = {uint, First}, last = {uint, Last}, settled = Settled0, @@ -599,7 +604,7 @@ send_transfer(Transfer0, Parts0, MaxMessageSize, #state{socket = Socket, {ok, length(Frames)} end. -send_flow_link(Send, OutHandle, +send_flow_link(OutHandle, #'v1_0.flow'{link_credit = {uint, Credit}} = Flow0, RenewWhenBelow, #state{links = Links, next_incoming_id = NII, @@ -625,7 +630,7 @@ send_flow_link(Send, OutHandle, %% initial attach frame from the sender this field MUST NOT be set." [2.7.4] delivery_count = maybe_uint(DeliveryCount), available = uint(Available)}, - ok = Send(Flow, State), + ok = send(Flow, State), State#state{links = Links#{OutHandle => Link#link{link_credit = Credit, auto_flow = AutoFlow}}}. @@ -777,8 +782,9 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _}, max_message_size = MaxMessageSize}, ok = Send(Attach, State), + LinkRef = make_link_ref(element(1, Role), self(), OutHandle), Link = #link{name = Name, - ref = make_link_ref(element(1, Role), self(), OutHandle), + ref = LinkRef, output_handle = OutHandle, state = attach_sent, role = element(1, Role), @@ -790,7 +796,7 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _}, {State#state{links = Links#{OutHandle => Link}, next_link_handle = NextLinkHandle, - link_index = LinkIndex#{Name => OutHandle}}, Link#link.ref}. + link_index = LinkIndex#{Name => OutHandle}}, LinkRef}. -spec handle_session_flow(#'v1_0.flow'{}, #state{}) -> #state{}. handle_session_flow(#'v1_0.flow'{next_incoming_id = MaybeNII, @@ -908,7 +914,6 @@ translate_delivery_state({modified, translate_delivery_state(released) -> #'v1_0.released'{}; translate_delivery_state(received) -> #'v1_0.received'{}. -translate_role(sender) -> false; translate_role(receiver) -> true. maybe_notify_link_credit(#link{role = sender, @@ -987,9 +992,11 @@ book_transfer_received(#state{next_incoming_id = NID, auto_flow(#link{link_credit = LC, auto_flow = {auto, RenewWhenBelow, Credit}, - output_handle = OutHandle}, State) - when LC < RenewWhenBelow -> - send_flow_link(fun send/2, OutHandle, + output_handle = OutHandle, + incoming_unsettled = Unsettled}, + State) + when LC + map_size(Unsettled) < RenewWhenBelow -> + send_flow_link(OutHandle, #'v1_0.flow'{link_credit = {uint, Credit}}, RenewWhenBelow, State); auto_flow(_, State) -> @@ -1045,7 +1052,8 @@ socket_send0({tcp, Socket}, Data) -> socket_send0({ssl, Socket}, Data) -> ssl:send(Socket, Data). --spec make_link_ref(_, _, _) -> link_ref(). +-spec make_link_ref(link_role(), pid(), output_handle()) -> + link_ref(). make_link_ref(Role, Session, Handle) -> #link_ref{role = Role, session = Session, link_handle = Handle}. @@ -1100,7 +1108,6 @@ format_status(Status = #{data := Data0}) -> connection_config = ConnectionConfig, outgoing_delivery_id = OutgoingDeliveryId, outgoing_unsettled = OutgoingUnsettled, - incoming_unsettled = IncomingUnsettled, notify = Notify } = Data0, Links = maps:map( @@ -1119,7 +1126,8 @@ format_status(Status = #{data := Data0}) -> available = Available, drain = Drain, partial_transfers = PartialTransfers0, - auto_flow = AutoFlow + auto_flow = AutoFlow, + incoming_unsettled = IncomingUnsettled }) -> PartialTransfers = case PartialTransfers0 of undefined -> @@ -1141,7 +1149,9 @@ format_status(Status = #{data := Data0}) -> available => Available, drain => Drain, partial_transfers => PartialTransfers, - auto_flow => AutoFlow} + auto_flow => AutoFlow, + incoming_unsettled => maps:size(IncomingUnsettled) + } end, Links0), Data = #{channel => Channel, remote_channel => RemoteChannel, @@ -1160,7 +1170,6 @@ format_status(Status = #{data := Data0}) -> connection_config => maps:remove(sasl, ConnectionConfig), outgoing_delivery_id => OutgoingDeliveryId, outgoing_unsettled => maps:size(OutgoingUnsettled), - incoming_unsettled => maps:size(IncomingUnsettled), notify => Notify}, Status#{data := Data}. diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl index 2147b0f156ce..62a7718657ef 100644 --- a/deps/amqp10_client/test/system_SUITE.erl +++ b/deps/amqp10_client/test/system_SUITE.erl @@ -514,16 +514,17 @@ subscribe(Config) -> <<"sub-receiver">>, QueueName, unsettled), ok = amqp10_client:flow_link_credit(Receiver, 10, never), - - _ = receive_messages(Receiver, 10), - % assert no further messages are delivered - timeout = receive_one(Receiver), - receive - {amqp10_event, {link, Receiver, credit_exhausted}} -> - ok - after 5000 -> - flush(), - exit(credit_exhausted_assert) + [begin + receive {amqp10_msg, Receiver, Msg} -> + ok = amqp10_client:accept_msg(Receiver, Msg) + after 2000 -> ct:fail(timeout) + end + end || _ <- lists:seq(1, 10)], + ok = assert_no_message(Receiver), + + receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok + after 5000 -> flush(), + exit(credit_exhausted_assert) end, ok = amqp10_client:end_session(Session), @@ -539,16 +540,121 @@ subscribe_with_auto_flow(Config) -> <<"sub-sender">>, QueueName), await_link(Sender, credited, link_credit_timeout), - _ = publish_messages(Sender, <<"banana">>, 10), - {ok, Receiver} = amqp10_client:attach_receiver_link(Session, - <<"sub-receiver">>, - QueueName, unsettled), - ok = amqp10_client:flow_link_credit(Receiver, 5, 2), - - _ = receive_messages(Receiver, 10), - % assert no further messages are delivered - timeout = receive_one(Receiver), + _ = publish_messages(Sender, <<"banana">>, 20), + %% Use sender settle mode 'settled'. + {ok, R1} = amqp10_client:attach_receiver_link( + Session, <<"sub-receiver-1">>, QueueName, settled), + await_link(R1, attached, attached_timeout), + ok = amqp10_client:flow_link_credit(R1, 5, 2), + ?assertEqual(20, count_received_messages(R1)), + ok = amqp10_client:detach_link(R1), + + _ = publish_messages(Sender, <<"banana">>, 30), + %% Use sender settle mode 'unsettled'. + %% This should require us to manually settle message in order to receive more messages. + {ok, R2} = amqp10_client:attach_receiver_link(Session, <<"sub-receiver-2">>, QueueName, unsettled), + await_link(R2, attached, attached_timeout), + ok = amqp10_client:flow_link_credit(R2, 5, 2), + %% We should receive exactly 5 messages. + [M1, _M2, M3, M4, M5] = receive_messages(R2, 5), + ok = assert_no_message(R2), + + %% Even when we accept the first 3 messages, the number of unsettled messages has not yet fallen below 2. + %% Therefore, the client should not yet grant more credits to the sender. + ok = amqp10_client_session:disposition( + R2, amqp10_msg:delivery_id(M1), amqp10_msg:delivery_id(M3), true, accepted), + ok = assert_no_message(R2), + + %% When we accept 1 more message (the order in which we accept shouldn't matter, here we accept M5 before M4), + %% the number of unsettled messages now falls below 2 (since only M4 is left unsettled). + %% Therefore, the client should grant 5 credits to the sender. + %% Therefore, we should receive 5 more messages. + ok = amqp10_client:accept_msg(R2, M5), + [_M6, _M7, _M8, _M9, M10] = receive_messages(R2, 5), + ok = assert_no_message(R2), + + %% It shouldn't matter how we settle messages, therefore we use 'rejected' this time. + %% Settling all in flight messages should cause us to receive exactly 5 more messages. + ok = amqp10_client_session:disposition( + R2, amqp10_msg:delivery_id(M4), amqp10_msg:delivery_id(M10), true, rejected), + [M11, _M12, _M13, _M14, M15] = receive_messages(R2, 5), + ok = assert_no_message(R2), + + %% Dynamically decrease link credit. + %% Since we explicitly tell to grant 3 new credits now, we expect to receive 3 more messages. + ok = amqp10_client:flow_link_credit(R2, 3, 3), + [M16, _M17, M18] = receive_messages(R2, 3), + ok = assert_no_message(R2), + + ok = amqp10_client_session:disposition( + R2, amqp10_msg:delivery_id(M11), amqp10_msg:delivery_id(M15), true, accepted), + %% However, the RenewWhenBelow=3 still refers to all unsettled messages. + %% Right now we have 3 messages (M16, M17, M18) unsettled. + ok = assert_no_message(R2), + + %% Settling 1 out of these 3 messages causes RenewWhenBelow to fall below 3 resulting + %% in 3 new messages to be received. + ok = amqp10_client:accept_msg(R2, M18), + [_M19, _M20, _M21] = receive_messages(R2, 3), + ok = assert_no_message(R2), + + ok = amqp10_client:flow_link_credit(R2, 3, never, true), + [_M22, _M23, M24] = receive_messages(R2, 3), + ok = assert_no_message(R2), + + %% Since RenewWhenBelow = never, we expect to receive no new messages despite settling. + ok = amqp10_client_session:disposition( + R2, amqp10_msg:delivery_id(M16), amqp10_msg:delivery_id(M24), true, rejected), + ok = assert_no_message(R2), + + ok = amqp10_client:flow_link_credit(R2, 2, never, false), + [M25, _M26] = receive_messages(R2, 2), + ok = assert_no_message(R2), + + ok = amqp10_client:flow_link_credit(R2, 3, 3), + [_M27, _M28, M29] = receive_messages(R2, 3), + ok = assert_no_message(R2), + + ok = amqp10_client_session:disposition( + R2, amqp10_msg:delivery_id(M25), amqp10_msg:delivery_id(M29), true, accepted), + [M30] = receive_messages(R2, 1), + ok = assert_no_message(R2), + ok = amqp10_client:accept_msg(R2, M30), + %% The sender queue is empty now. + ok = assert_no_message(R2), + + ok = amqp10_client:flow_link_credit(R2, 3, 1), + _ = publish_messages(Sender, <<"banana">>, 1), + [M31] = receive_messages(R2, 1), + ok = amqp10_client:accept_msg(R2, M31), + + %% Since function flow_link_credit/3 documents + %% "if RenewWhenBelow is an integer, the amqp10_client will automatically grant more + %% Credit to the sender when the sum of the remaining link credit and the number of + %% unsettled messages falls below the value of RenewWhenBelow." + %% our expectation is that the amqp10_client has not renewed credit since the sum of + %% remaining link credit (2) and unsettled messages (0) is 2. + %% + %% Therefore, when we publish another 3 messages, we expect to only receive only 2 messages! + _ = publish_messages(Sender, <<"banana">>, 5), + [M32, M33] = receive_messages(R2, 2), + ok = assert_no_message(R2), + + %% When we accept both messages, the sum of the remaining link credit (0) and unsettled messages (0) + %% falls below RenewWhenBelow=1 causing the amqp10_client to grant 3 new credits. + ok = amqp10_client:accept_msg(R2, M32), + ok = assert_no_message(R2), + ok = amqp10_client:accept_msg(R2, M33), + + [M35, M36, M37] = receive_messages(R2, 3), + ok = amqp10_client:accept_msg(R2, M35), + ok = amqp10_client:accept_msg(R2, M36), + ok = amqp10_client:accept_msg(R2, M37), + %% The sender queue is empty now. + ok = assert_no_message(R2), + + ok = amqp10_client:detach_link(R2), ok = amqp10_client:end_session(Session), ok = amqp10_client:close_connection(Connection). @@ -703,11 +809,19 @@ incoming_heartbeat(Config) -> %%% HELPERS %%% -receive_messages(Receiver, Num) -> - [begin - ct:pal("receive_messages ~tp", [T]), - ok = receive_one(Receiver) - end || T <- lists:seq(1, Num)]. +await_link(Who, What, Err) -> + receive + {amqp10_event, {link, Who0, What0}} + when Who0 =:= Who andalso + What0 =:= What -> + ok; + {amqp10_event, {link, Who0, {detached, Why}}} + when Who0 =:= Who -> + exit(Why) + after 5000 -> + flush(), + exit(Err) + end. publish_messages(Sender, Data, Num) -> [begin @@ -717,36 +831,42 @@ publish_messages(Sender, Data, Num) -> ok = await_disposition(Tag) end || T <- lists:seq(1, Num)]. -receive_one(Receiver) -> - receive - {amqp10_msg, Receiver0, Msg} - when Receiver0 =:= Receiver -> - amqp10_client:accept_msg(Receiver, Msg) - after 2000 -> - timeout - end. - await_disposition(DeliveryTag) -> receive {amqp10_disposition, {accepted, DeliveryTag0}} when DeliveryTag0 =:= DeliveryTag -> ok after 3000 -> flush(), - exit(dispostion_timeout) + ct:fail(dispostion_timeout) end. -await_link(Who, What, Err) -> +count_received_messages(Receiver) -> + count_received_messages0(Receiver, 0). + +count_received_messages0(Receiver, Count) -> receive - {amqp10_event, {link, Who0, What0}} - when Who0 =:= Who andalso - What0 =:= What -> - ok; - {amqp10_event, {link, Who0, {detached, Why}}} - when Who0 =:= Who -> - exit(Why) - after 5000 -> - flush(), - exit(Err) + {amqp10_msg, Receiver, _Msg} -> + count_received_messages0(Receiver, Count + 1) + after 200 -> + Count + end. + +receive_messages(Receiver, N) -> + receive_messages0(Receiver, N, []). + +receive_messages0(_Receiver, 0, Acc) -> + lists:reverse(Acc); +receive_messages0(Receiver, N, Acc) -> + receive + {amqp10_msg, Receiver, Msg} -> + receive_messages0(Receiver, N - 1, [Msg | Acc]) + after 5000 -> + ct:fail({timeout, {num_received, length(Acc)}, {num_missing, N}}) + end. + +assert_no_message(Receiver) -> + receive {amqp10_msg, Receiver, Msg} -> ct:fail({unexpected_message, Msg}) + after 50 -> ok end. to_bin(X) when is_list(X) -> diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 884f54033a9f..c450ca06e346 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -492,7 +492,7 @@ receiver_settle_mode_first(Config) -> ?assertEqual(DeliveryIdMsg9, serial_number_increment(DeliveryIdMsg8)), Last1 = serial_number_increment(serial_number_increment(DeliveryIdMsg9)), ok = amqp10_client_session:disposition( - Session, receiver, DeliveryIdMsg8, Last1, true, accepted), + Receiver, DeliveryIdMsg8, Last1, true, accepted), assert_messages(QName, 8, 7, Config), %% 2. Ack a range smaller than the number of unacked messages where all delivery IDs @@ -501,7 +501,7 @@ receiver_settle_mode_first(Config) -> DeliveryIdMsg4 = amqp10_msg:delivery_id(Msg4), DeliveryIdMsg6 = amqp10_msg:delivery_id(Msg6), ok = amqp10_client_session:disposition( - Session, receiver, DeliveryIdMsg4, DeliveryIdMsg6, true, accepted), + Receiver, DeliveryIdMsg4, DeliveryIdMsg6, true, accepted), assert_messages(QName, 5, 4, Config), %% 3. Ack a range larger than the number of unacked messages where all delivery IDs @@ -509,7 +509,7 @@ receiver_settle_mode_first(Config) -> DeliveryIdMsg2 = amqp10_msg:delivery_id(Msg2), DeliveryIdMsg7 = amqp10_msg:delivery_id(Msg7), ok = amqp10_client_session:disposition( - Session, receiver, DeliveryIdMsg2, DeliveryIdMsg7, true, accepted), + Receiver, DeliveryIdMsg2, DeliveryIdMsg7, true, accepted), assert_messages(QName, 2, 1, Config), %% Consume the last message. @@ -523,16 +523,16 @@ receiver_settle_mode_first(Config) -> DeliveryIdMsg10 = amqp10_msg:delivery_id(Msg10), Last2 = serial_number_increment(DeliveryIdMsg10), ok = amqp10_client_session:disposition( - Session, receiver, DeliveryIdMsg1, Last2, true, accepted), + Receiver, DeliveryIdMsg1, Last2, true, accepted), assert_messages(QName, 0, 0, Config), %% 5. Ack single delivery ID when there are no unacked messages. ok = amqp10_client_session:disposition( - Session, receiver, DeliveryIdMsg1, DeliveryIdMsg1, true, accepted), + Receiver, DeliveryIdMsg1, DeliveryIdMsg1, true, accepted), %% 6. Ack multiple delivery IDs when there are no unacked messages. ok = amqp10_client_session:disposition( - Session, receiver, DeliveryIdMsg1, DeliveryIdMsg6, true, accepted), + Receiver, DeliveryIdMsg1, DeliveryIdMsg6, true, accepted), assert_messages(QName, 0, 0, Config), ok = amqp10_client:detach_link(Receiver), @@ -684,7 +684,7 @@ amqp_stream_amqpl(Config) -> #amqp_msg{props = #'P_basic'{type = <<"amqp-1.0">>}}} -> ok after 5000 -> - exit(basic_deliver_timeout) + ct:fail(basic_deliver_timeout) end, #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_channel(Ch). @@ -1736,7 +1736,7 @@ detach_requeues(Config) -> %% Receiver2 accepts all 4 messages. ok = amqp10_client_session:disposition( - Session, receiver, + Receiver2, amqp10_msg:delivery_id(Msg2), amqp10_msg:delivery_id(Msg3b), true, accepted), @@ -2328,29 +2328,20 @@ async_notify(SenderSettleMode, QType, Config) -> end, %% Initially, grant 10 credits to the sending queue. - %% Whenever credits drops below 5, renew back to 10. + %% Whenever the sum of credits and number of unsettled messages drops below 5, renew back to 10. ok = amqp10_client:flow_link_credit(Receiver, 10, 5), %% We should receive all messages. - Msgs = receive_messages(Receiver, NumMsgs), + Accept = case SenderSettleMode of + settled -> false; + unsettled -> true + end, + Msgs = receive_all_messages(Receiver, Accept), FirstMsg = hd(Msgs), LastMsg = lists:last(Msgs), ?assertEqual([<<"1">>], amqp10_msg:body(FirstMsg)), ?assertEqual([integer_to_binary(NumMsgs)], amqp10_msg:body(LastMsg)), - case SenderSettleMode of - settled -> - ok; - unsettled -> - ok = amqp10_client_session:disposition( - Session, - receiver, - amqp10_msg:delivery_id(FirstMsg), - amqp10_msg:delivery_id(LastMsg), - true, - accepted) - end, - %% No further messages should be delivered. receive Unexpected -> ct:fail({received_unexpected_message, Unexpected}) after 50 -> ok @@ -2503,8 +2494,7 @@ queue_and_client_different_nodes(QueueLeaderNode, ClientNode, QueueType, Config) ?assertEqual([<<"1">>], amqp10_msg:body(FirstMsg)), ?assertEqual([integer_to_binary(NumMsgs)], amqp10_msg:body(LastMsg)), ok = amqp10_client_session:disposition( - Session, - receiver, + Receiver, amqp10_msg:delivery_id(FirstMsg), amqp10_msg:delivery_id(LastMsg), true, @@ -2803,7 +2793,7 @@ stream_filtering(Config) -> #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, <<"rabbitmq:stream-filter">> => <<"apple">>}), ok = amqp10_client:flow_link_credit(AppleReceiver, 100, 10), - AppleMessages = receive_all_messages(AppleReceiver, []), + AppleMessages = receive_all_messages(AppleReceiver, true), %% we should get less than all the waves combined ?assert(length(AppleMessages) < WaveCount * 3), %% client-side filtering @@ -2824,7 +2814,7 @@ stream_filtering(Config) -> #{<<"rabbitmq:stream-offset-spec">> => <<"first">>, <<"rabbitmq:stream-filter">> => [<<"apple">>, <<"orange">>]}), ok = amqp10_client:flow_link_credit(AppleOrangeReceiver, 100, 10), - AppleOrangeMessages = receive_all_messages(AppleOrangeReceiver, []), + AppleOrangeMessages = receive_all_messages(AppleOrangeReceiver, true), %% we should get less than all the waves combined ?assert(length(AppleOrangeMessages) < WaveCount * 3), %% client-side filtering @@ -2848,7 +2838,7 @@ stream_filtering(Config) -> <<"rabbitmq:stream-match-unfiltered">> => {boolean, true}}), ok = amqp10_client:flow_link_credit(AppleUnfilteredReceiver, 100, 10), - AppleUnfilteredMessages = receive_all_messages(AppleUnfilteredReceiver, []), + AppleUnfilteredMessages = receive_all_messages(AppleUnfilteredReceiver, true), %% we should get less than all the waves combined ?assert(length(AppleUnfilteredMessages) < WaveCount * 3), %% client-side filtering @@ -3351,10 +3341,16 @@ classic_priority_queue(Config) -> %% internal %% -receive_all_messages(Receiver, Acc) -> +receive_all_messages(Receiver, Accept) -> + receive_all_messages0(Receiver, Accept, []). + +receive_all_messages0(Receiver, Accept, Acc) -> receive {amqp10_msg, Receiver, Msg} -> - ok = amqp10_client:accept_msg(Receiver, Msg), - receive_all_messages(Receiver, [Msg | Acc]) + case Accept of + true -> ok = amqp10_client:accept_msg(Receiver, Msg); + false -> ok + end, + receive_all_messages0(Receiver, Accept, [Msg | Acc]) after 500 -> lists:reverse(Acc) end. @@ -3501,7 +3497,7 @@ receive_messages0(Receiver, N, Acc) -> {amqp10_msg, Receiver, Msg} -> receive_messages0(Receiver, N - 1, [Msg | Acc]) after 5000 -> - exit({timeout, {num_received, length(Acc)}, {num_missing, N}}) + ct:fail({timeout, {num_received, length(Acc)}, {num_missing, N}}) end. count_received_messages(Receiver) -> diff --git a/deps/rabbit/test/amqp_credit_api_v2_SUITE.erl b/deps/rabbit/test/amqp_credit_api_v2_SUITE.erl index b9f053db6ce9..fb46a7def31f 100644 --- a/deps/rabbit/test/amqp_credit_api_v2_SUITE.erl +++ b/deps/rabbit/test/amqp_credit_api_v2_SUITE.erl @@ -121,8 +121,8 @@ credit_api_v2(Config) -> filter => #{}}, {ok, QQReceiver1} = amqp10_client:attach_link(Session, QQAttachArgs), - ok = consume_and_accept(10, CQReceiver1, Session), - ok = consume_and_accept(10, QQReceiver1, Session), + ok = consume_and_accept(10, CQReceiver1), + ok = consume_and_accept(10, QQReceiver1), ?assertEqual(ok, rabbit_ct_broker_helpers:enable_feature_flag(Config, ?FUNCTION_NAME)), @@ -133,12 +133,12 @@ credit_api_v2(Config) -> Session, <<"cq receiver 2">>, CQAddr, unsettled), {ok, QQReceiver2} = amqp10_client:attach_receiver_link( Session, <<"qq receiver 2">>, QQAddr, unsettled), - ok = consume_and_accept(10, CQReceiver2, Session), - ok = consume_and_accept(10, QQReceiver2, Session), + ok = consume_and_accept(10, CQReceiver2), + ok = consume_and_accept(10, QQReceiver2), %% Consume via with credit API v1 - ok = consume_and_accept(10, CQReceiver1, Session), - ok = consume_and_accept(10, QQReceiver1, Session), + ok = consume_and_accept(10, CQReceiver1), + ok = consume_and_accept(10, QQReceiver1), %% Detach the credit API v1 links and attach with the same output handle. ok = detach_sync(CQReceiver1), @@ -147,8 +147,8 @@ credit_api_v2(Config) -> {ok, QQReceiver3} = amqp10_client:attach_link(Session, QQAttachArgs), %% The new links should use credit API v2 - ok = consume_and_accept(10, CQReceiver3, Session), - ok = consume_and_accept(10, QQReceiver3, Session), + ok = consume_and_accept(10, CQReceiver3), + ok = consume_and_accept(10, QQReceiver3), flush(pre_drain), %% Draining should also work. @@ -181,12 +181,11 @@ credit_api_v2(Config) -> after 5000 -> ct:fail(missing_closed) end. -consume_and_accept(NumMsgs, Receiver, Session) -> +consume_and_accept(NumMsgs, Receiver) -> ok = amqp10_client:flow_link_credit(Receiver, NumMsgs, never), Msgs = receive_messages(Receiver, NumMsgs), ok = amqp10_client_session:disposition( - Session, - receiver, + Receiver, amqp10_msg:delivery_id(hd(Msgs)), amqp10_msg:delivery_id(lists:last(Msgs)), true, diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl index 147626de7f72..492fd535d959 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl @@ -283,30 +283,24 @@ close_dest(#{dest := #{current := #{conn := Conn}}}) -> close_dest(_Config) -> ok. -spec ack(Tag :: tag(), Multi :: boolean(), state()) -> state(). -ack(Tag, true, State = #{source := #{current := #{session := Session}, +ack(Tag, true, State = #{source := #{current := #{link := LinkRef}, last_acked_tag := LastTag} = Src}) -> First = LastTag + 1, - ok = amqp10_client_session:disposition(Session, receiver, First, - Tag, true, accepted), + ok = amqp10_client_session:disposition(LinkRef, First, Tag, true, accepted), State#{source => Src#{last_acked_tag => Tag}}; -ack(Tag, false, State = #{source := #{current := - #{session := Session}} = Src}) -> - ok = amqp10_client_session:disposition(Session, receiver, Tag, - Tag, true, accepted), +ack(Tag, false, State = #{source := #{current := #{link := LinkRef}} = Src}) -> + ok = amqp10_client_session:disposition(LinkRef, Tag, Tag, true, accepted), State#{source => Src#{last_acked_tag => Tag}}. -spec nack(Tag :: tag(), Multi :: boolean(), state()) -> state(). -nack(Tag, false, State = #{source := - #{current := #{session := Session}} = Src}) -> +nack(Tag, false, State = #{source := #{current := #{link := LinkRef}} = Src}) -> % the tag is the same as the deliveryid - ok = amqp10_client_session:disposition(Session, receiver, Tag, - Tag, false, rejected), + ok = amqp10_client_session:disposition(LinkRef, Tag, Tag, true, rejected), State#{source => Src#{last_nacked_tag => Tag}}; -nack(Tag, true, State = #{source := #{current := #{session := Session}, - last_nacked_tag := LastTag} = Src}) -> +nack(Tag, true, State = #{source := #{current := #{link := LinkRef}, + last_nacked_tag := LastTag} = Src}) -> First = LastTag + 1, - ok = amqp10_client_session:disposition(Session, receiver, First, - Tag, true, accepted), + ok = amqp10_client_session:disposition(LinkRef, First, Tag, true, rejected), State#{source => Src#{last_nacked_tag => Tag}}. status(#{dest := #{current := #{link_state := attached}}}) -> diff --git a/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl b/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl index 3ae1fbe603a5..6a5dd4151d5a 100644 --- a/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl +++ b/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl @@ -178,8 +178,7 @@ amqp_credit_multiple_grants(Config) -> %% Let's ack all of them. ok = amqp10_client_session:disposition( - Session, - receiver, + Receiver, amqp10_msg:delivery_id(M1), amqp10_msg:delivery_id(M4), true, @@ -226,8 +225,7 @@ amqp_credit_multiple_grants(Config) -> %% Let's ack them all. ok = amqp10_client_session:disposition( - Session, - receiver, + Receiver, amqp10_msg:delivery_id(M5), amqp10_msg:delivery_id(M11), true, diff --git a/moduleindex.yaml b/moduleindex.yaml index 7b632d7213a9..2c0451c094f8 100755 --- a/moduleindex.yaml +++ b/moduleindex.yaml @@ -3,6 +3,23 @@ accept: - accept_header - accept_neg - accept_parser +amqp10_client: +- amqp10_client +- amqp10_client_app +- amqp10_client_connection +- amqp10_client_connection_sup +- amqp10_client_frame_reader +- amqp10_client_session +- amqp10_client_sessions_sup +- amqp10_client_sup +- amqp10_client_types +- amqp10_msg +amqp10_common: +- amqp10_binary_generator +- amqp10_binary_parser +- amqp10_framing +- amqp10_framing0 +- serial_number amqp_client: - amqp_auth_mechanisms - amqp_channel @@ -28,23 +45,6 @@ amqp_client: - amqp_util - rabbit_routing_util - uri_parser -amqp10_client: -- amqp10_client -- amqp10_client_app -- amqp10_client_connection -- amqp10_client_connection_sup -- amqp10_client_frame_reader -- amqp10_client_session -- amqp10_client_sessions_sup -- amqp10_client_sup -- amqp10_client_types -- amqp10_msg -amqp10_common: -- amqp10_binary_generator -- amqp10_binary_parser -- amqp10_framing -- amqp10_framing0 -- serial_number aten: - aten - aten_app