Skip to content

Commit

Permalink
QQ: fix bug with discards using a consumer_id()
Browse files Browse the repository at this point in the history
Fixes a pattern matching bug for discards that come in after a consumer
has been cancelled. Because the rabbit_fifo_client does not keep
the integer consumer key after cancellation, late acks, returns, and
discards use the full {CTag, Pid} consumer id version.

As this is a state machine change the machine version has been
increased to 5.

The same bug is present for the `modify` command also however as
AMQP does not allow late settlements we don't have to make this
fix conditional on the machine version as it cannot happen.
  • Loading branch information
kjnilsson committed Oct 3, 2024
1 parent bc1e0ad commit 2339401
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 49 deletions.
39 changes: 28 additions & 11 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -265,15 +265,27 @@ apply(Meta, #settle{msg_ids = MsgIds,
_ ->
{State, ok}
end;
apply(Meta, #discard{consumer_key = ConsumerKey,
msg_ids = MsgIds},
apply(#{machine_version := 4} = Meta,
#discard{consumer_key = ConsumerKey,
msg_ids = MsgIds},
#?STATE{consumers = Consumers } = State0) ->
%% buggy version that would have not found the consumer if the ConsumerKey
%% was a consumer_id()
case find_consumer(ConsumerKey, Consumers) of
{ConsumerKey, #consumer{} = Con} ->
discard(Meta, MsgIds, ConsumerKey, Con, true, #{}, State0);
_ ->
{State0, ok}
end;
apply(Meta, #discard{consumer_key = ConsumerKey,
msg_ids = MsgIds},
#?STATE{consumers = Consumers } = State0) ->
case find_consumer(ConsumerKey, Consumers) of
{ActualConsumerKey, #consumer{} = Con} ->
discard(Meta, MsgIds, ActualConsumerKey, Con, true, #{}, State0);
_ ->
{State0, ok}
end;
apply(Meta, #return{consumer_key = ConsumerKey,
msg_ids = MsgIds},
#?STATE{consumers = Cons} = State) ->
Expand All @@ -291,13 +303,14 @@ apply(Meta, #modify{consumer_key = ConsumerKey,
msg_ids = MsgIds},
#?STATE{consumers = Cons} = State) ->
case find_consumer(ConsumerKey, Cons) of
{ConsumerKey, #consumer{checked_out = Checked}}
{ActualConsumerKey, #consumer{checked_out = Checked}}
when Undel == false ->
return(Meta, ConsumerKey, MsgIds, DelFailed,
return(Meta, ActualConsumerKey, MsgIds, DelFailed,
Anns, Checked, [], State);
{ConsumerKey, #consumer{} = Con}
{ActualConsumerKey, #consumer{} = Con}
when Undel == true ->
discard(Meta, MsgIds, ConsumerKey, Con, DelFailed, Anns, State);
discard(Meta, MsgIds, ActualConsumerKey,
Con, DelFailed, Anns, State);
_ ->
{State, ok}
end;
Expand Down Expand Up @@ -898,13 +911,14 @@ get_checked_out(CKey, From, To, #?STATE{consumers = Consumers}) ->
end.

-spec version() -> pos_integer().
version() -> 4.
version() -> 5.

which_module(0) -> rabbit_fifo_v0;
which_module(1) -> rabbit_fifo_v1;
which_module(2) -> rabbit_fifo_v3;
which_module(3) -> rabbit_fifo_v3;
which_module(4) -> ?MODULE.
which_module(4) -> ?MODULE;
which_module(5) -> ?MODULE.

-define(AUX, aux_v3).

Expand Down Expand Up @@ -2520,15 +2534,15 @@ make_checkout({_, _} = ConsumerId, Spec0, Meta) ->
make_settle(ConsumerKey, MsgIds) when is_list(MsgIds) ->
#settle{consumer_key = ConsumerKey, msg_ids = MsgIds}.

-spec make_return(consumer_id(), [msg_id()]) -> protocol().
-spec make_return(consumer_key(), [msg_id()]) -> protocol().
make_return(ConsumerKey, MsgIds) ->
#return{consumer_key = ConsumerKey, msg_ids = MsgIds}.

-spec is_return(protocol()) -> boolean().
is_return(Command) ->
is_record(Command, return).

-spec make_discard(consumer_id(), [msg_id()]) -> protocol().
-spec make_discard(consumer_key(), [msg_id()]) -> protocol().
make_discard(ConsumerKey, MsgIds) ->
#discard{consumer_key = ConsumerKey, msg_ids = MsgIds}.

Expand Down Expand Up @@ -2701,7 +2715,10 @@ convert(Meta, 1, To, State) ->
convert(Meta, 2, To, State) ->
convert(Meta, 3, To, rabbit_fifo_v3:convert_v2_to_v3(State));
convert(Meta, 3, To, State) ->
convert(Meta, 4, To, convert_v3_to_v4(Meta, State)).
convert(Meta, 4, To, convert_v3_to_v4(Meta, State));
convert(Meta, 4, To, State) ->
%% no conversion needed, this version only includes a logic change
convert(Meta, 5, To, State).

smallest_raft_index(#?STATE{messages = Messages,
ra_indexes = Indexes,
Expand Down
32 changes: 32 additions & 0 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ all_tests() ->
per_message_ttl_expiration_too_high,
consumer_priorities,
cancel_consumer_gh_3729,
cancel_consumer_gh_12424,
cancel_and_consume_with_same_tag,
validate_messages_on_queue,
amqpl_headers,
Expand Down Expand Up @@ -3600,6 +3601,37 @@ cancel_consumer_gh_3729(Config) ->

ok = rabbit_ct_client_helpers:close_channel(Ch).

cancel_consumer_gh_12424(Config) ->
QQ = ?config(queue_name, Config),

Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),

ExpectedDeclareRslt0 = #'queue.declare_ok'{queue = QQ, message_count = 0, consumer_count = 0},
DeclareRslt0 = declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
?assertMatch(ExpectedDeclareRslt0, DeclareRslt0),

ok = publish(Ch, QQ),

ok = subscribe(Ch, QQ, false),

DeliveryTag = receive
{#'basic.deliver'{delivery_tag = DT}, _} ->
DT
after 5000 ->
flush(100),
ct:fail("basic.deliver timeout")
end,

ok = cancel(Ch),

R = #'basic.reject'{delivery_tag = DeliveryTag, requeue = false},
ok = amqp_channel:cast(Ch, R),
wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),

ok.

%% Test the scenario where a message is published to a quorum queue
cancel_and_consume_with_same_tag(Config) ->
%% https://github.com/rabbitmq/rabbitmq-server/issues/5927
QQ = ?config(queue_name, Config),
Expand Down
53 changes: 15 additions & 38 deletions deps/rabbit/test/rabbit_fifo_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ groups() ->
].

init_per_group(tests, Config) ->
[{machine_version, 4} | Config];
[{machine_version, 5} | Config];
init_per_group(machine_version_conversion, Config) ->
Config.

init_per_testcase(_Testcase, Config) ->
FF = ?config(machine_version, Config) == 4,
FF = ?config(machine_version, Config) == 5,
ok = meck:new(rabbit_feature_flags, [passthrough]),
meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> FF end),
Config.
Expand Down Expand Up @@ -804,6 +804,19 @@ discarded_message_with_dead_letter_handler_emits_log_effect_test(Config) ->

ok.

discard_after_cancel_test(Config) ->
Cid = {?FUNCTION_NAME_B, self()},
{State0, _} = enq(Config, 1, 1, first, test_init(test)),
{State1, #{key := _CKey,
next_msg_id := MsgId}, _Effects1} =
checkout(Config, ?LINE, Cid, 10, State0),
{State2, _, _} = apply(meta(Config, ?LINE),
rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
{State, _, _} = apply(meta(Config, ?LINE),
rabbit_fifo:make_discard(Cid, [MsgId]), State2),
ct:pal("State ~p", [State]),
ok.

enqueued_msg_with_delivery_count_test(Config) ->
State00 = init(#{name => test,
queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
Expand Down Expand Up @@ -2786,45 +2799,9 @@ modify_test(Config) ->

ok.

ttb_test(Config) ->
S0 = init(#{name => ?FUNCTION_NAME,
queue_resource =>
rabbit_misc:r("/", queue, ?FUNCTION_NAME_B)}),


S1 = do_n(5_000_000,
fun (N, Acc) ->
I = (5_000_000 - N),
element(1, enq(Config, I, I, ?FUNCTION_NAME_B, Acc))
end, S0),



{T1, _Res} = timer:tc(fun () ->
do_n(100, fun (_, S) ->
term_to_binary(S),
S1 end, S1)
end),
ct:pal("T1 took ~bus", [T1]),


{T2, _} = timer:tc(fun () ->
do_n(100, fun (_, S) -> term_to_iovec(S), S1 end, S1)
end),
ct:pal("T2 took ~bus", [T2]),

ok.

%% Utility
%%

do_n(0, _, A) ->
A;
do_n(N, Fun, A0) ->
A = Fun(N, A0),
do_n(N-1, Fun, A).


init(Conf) -> rabbit_fifo:init(Conf).
make_register_enqueuer(Pid) -> rabbit_fifo:make_register_enqueuer(Pid).
apply(Meta, Entry, State) -> rabbit_fifo:apply(Meta, Entry, State).
Expand Down

0 comments on commit 2339401

Please sign in to comment.