Skip to content

Commit

Permalink
Restore credit_flow between channel/MQTT connection -> CQ processes
Browse files Browse the repository at this point in the history
The credit_flow between publishing AMQP 0.9.1 channel (or MQTT
connection) and (non-mirrored) classic queue processes was
unintentionally removed in 4.0 together with anything else related to
CQ mirroring.

By default we restore the 3.x behaviour for non-mirored classic
queues. It is possible to disable flow-control (the earlier 4.0.x
behaviour) with the new env `classic_queue_flow_control`. In 3.x this
was possible with the config `mirroring_flow_control`.
  • Loading branch information
gomoripeti committed Dec 9, 2024
1 parent 7b7708f commit d65bd7d
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 4 deletions.
2 changes: 2 additions & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ _APP_ENV = """[
{exit_on_close, false}
]},
{ssl_apps, [asn1, crypto, public_key, ssl]},
%% see rabbitmq-server#114
{classic_queue_flow_control, true},
%% see rabbitmq-server#227 and related tickets.
%% msg_store_credit_disc_bound only takes effect when
%% messages are persisted to the message store. If messages
Expand Down
2 changes: 2 additions & 0 deletions deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ define PROJECT_ENV
{exit_on_close, false}
]},
{ssl_apps, [asn1, crypto, public_key, ssl]},
%% see rabbitmq-server#114
{classic_queue_flow_control, true},
%% see rabbitmq-server#227 and related tickets.
%% msg_store_credit_disc_bound only takes effect when
%% messages are persisted to the message store. If messages
Expand Down
8 changes: 7 additions & 1 deletion deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@
rejected,
%% used by "one shot RPC" (amq.
reply_consumer :: none | {rabbit_types:ctag(), binary(), binary()},
delivery_flow, %% Deprecated since removal of CMQ in 4.0
%% see rabbitmq-server#114
delivery_flow :: flow | noflow,
interceptor_state,
queue_states,
tick_timer,
Expand Down Expand Up @@ -489,6 +490,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
?LG_PROCESS_TYPE(channel),
?store_proc_name({ConnName, Channel}),
ok = pg_local:join(rabbit_channels, self()),
Flow = case rabbit_misc:get_env(rabbit, classic_queue_flow_control, true) of
true -> flow;
false -> noflow
end,
{ok, {Global0, Prefetch}} = application:get_env(rabbit, default_consumer_prefetch),
Limiter0 = rabbit_limiter:new(LimiterPid),
Global = Global0 andalso is_global_qos_permitted(),
Expand Down Expand Up @@ -537,6 +542,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
rejected = [],
confirmed = [],
reply_consumer = none,
delivery_flow = Flow,
interceptor_state = undefined,
queue_states = rabbit_queue_type:init()
},
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_classic_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ deliver(Qs0, Msg0, Options) ->
Confirm = MsgSeqNo /= undefined,

{MPids, Qs} = qpids(Qs0, Confirm, MsgSeqNo),
Delivery = rabbit_basic:delivery(Mandatory, Confirm, Msg, MsgSeqNo),
Delivery = rabbit_basic:delivery(Mandatory, Confirm, Msg, MsgSeqNo, Flow),

case Flow of
%% Here we are tracking messages sent by the rabbit_channel
Expand Down
122 changes: 121 additions & 1 deletion deps/rabbit/test/classic_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").

-compile([nowarn_export_all, export_all]).

Expand All @@ -18,11 +19,17 @@

all() ->
[
{group, cluster_size_1},
{group, cluster_size_3}
].

groups() ->
[
{cluster_size_1, [], [
classic_queue_flow_control_enabled,
classic_queue_flow_control_disabled
]
},
{cluster_size_3, [], [
leader_locator_client_local,
leader_locator_balanced,
Expand All @@ -42,10 +49,14 @@ end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).

init_per_group(Group, Config) ->
Nodes = case Group of
cluster_size_1 -> 1;
cluster_size_3 -> 3
end,
Config1 = rabbit_ct_helpers:set_config(Config,
[
{rmq_nodename_suffix, Group},
{rmq_nodes_count, 3},
{rmq_nodes_count, Nodes},
{rmq_nodes_clustered, true},
{tcp_ports_base, {skip_n_nodes, 3}}
]),
Expand All @@ -72,6 +83,67 @@ init_per_testcase(T, Config) ->
%% Testcases.
%% -------------------------------------------------------------------

classic_queue_flow_control_enabled(Config) ->
FlowEnabled = true,
VerifyFun =
fun(QPid, ConnPid) ->
%% Only 2+2 messages reach the message queue of the classic queue.
%% (before the credits of the connection and channel processes run out)
?awaitMatch(4, proc_info(QPid, message_queue_len), 1000),
?assertMatch({0, _}, gen_server2_queue(QPid)),

%% The connection gets into flow state
?assertEqual([{state, flow}], rabbit_reader:info(ConnPid, [state])),

Dict = proc_info(ConnPid, dictionary),
?assertMatch([_|_], proplists:get_value(credit_blocked, Dict)),
ok
end,
flow_control(Config, FlowEnabled, VerifyFun).

classic_queue_flow_control_disabled(Config) ->
FlowEnabled = false,
VerifyFun =
fun(QPid, ConnPid) ->
%% All published messages will end up in the message
%% queue of the suspended classic queue process
?awaitMatch(100, proc_info(QPid, message_queue_len), 1000),
?assertMatch({0, _}, gen_server2_queue(QPid)),

%% The connection dos not get into flow state
?assertEqual([{state, running}], rabbit_reader:info(ConnPid, [state])),

Dict = proc_info(ConnPid, dictionary),
?assertMatch([], proplists:get_value(credit_blocked, Dict, []))
end,
flow_control(Config, FlowEnabled, VerifyFun).

flow_control(Config, FlowEnabled, VerifyFun) ->
OrigCredit = set_default_credit(Config, {2, 1}),
OrigFlow = set_flow_control(Config, FlowEnabled),

Ch = rabbit_ct_client_helpers:open_channel(Config),
QueueName = atom_to_binary(?FUNCTION_NAME),
declare(Ch, QueueName, [{<<"x-queue-type">>, longstr, <<"classic">>}]),
QPid = get_queue_pid(Config, QueueName),
try
sys:suspend(QPid),

%% Publish 100 messages without publisher confirms
publish_many(Ch, QueueName, 100),

[ConnPid] = rabbit_ct_broker_helpers:rpc(Config, rabbit_networking, local_connections, []),

VerifyFun(QPid, ConnPid),
ok
after
sys:resume(QPid),
delete_queues(Ch, [QueueName]),
set_default_credit(Config, OrigCredit),
set_flow_control(Config, OrigFlow),
rabbit_ct_client_helpers:close_channel(Ch)
end.

leader_locator_client_local(Config) ->
Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Q = <<"q1">>,
Expand Down Expand Up @@ -129,7 +201,55 @@ declare(Ch, Q, Args) ->
auto_delete = false,
arguments = Args}).

delete_queues(Ch, Qs) ->
[?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch, #'queue.delete'{queue = Q}))
|| Q <- Qs].

delete_queues() ->
[rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
|| Q <- rabbit_amqqueue:list()].


publish(Ch, QName, Payload) ->
amqp_channel:cast(Ch,
#'basic.publish'{exchange = <<>>,
routing_key = QName},
#amqp_msg{payload = Payload}).

publish_many(Ch, QName, Count) ->
[publish(Ch, QName, integer_to_binary(I))
|| I <- lists:seq(1, Count)].

proc_info(Pid, Info) ->
case rabbit_misc:process_info(Pid, Info) of
{Info, Value} ->
Value;
Error ->
{error, Error}
end.

gen_server2_queue(Pid) ->
Status = sys:get_status(Pid),
{status, Pid,_Mod,
[_Dict, _SysStatus, _Parent, _Dbg,
[{header, _},
{data, Data}|_]]} = Status,
proplists:get_value("Queued messages", Data).

set_default_credit(Config, Value) ->
Key = credit_flow_default_credit,
OrigValue = rabbit_ct_broker_helpers:rpc(Config, persistent_term, get, [Key]),
ok = rabbit_ct_broker_helpers:rpc(Config, persistent_term, put, [Key, Value]),
OrigValue.

set_flow_control(Config, Value) when is_boolean(Value) ->
Key = classic_queue_flow_control,
{ok, OrigValue} = rabbit_ct_broker_helpers:rpc(Config, application, get_env, [rabbit, Key]),
rabbit_ct_broker_helpers:rpc(Config, application, set_env, [rabbit, Key, Value]),
OrigValue.

get_queue_pid(Config, QueueName) ->
{ok, QRec} = rabbit_ct_broker_helpers:rpc(
Config, 0, rabbit_amqqueue, lookup, [QueueName, <<"/">>]),
amqqueue:get_pid(QRec).
11 changes: 10 additions & 1 deletion deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
published = false :: boolean(),
ssl_login_name :: none | binary(),
retainer_pid :: pid(),
delivery_flow :: flow | noflow,
trace_state :: rabbit_trace:state(),
prefetch :: non_neg_integer(),
vhost :: rabbit_types:vhost(),
Expand Down Expand Up @@ -148,6 +149,10 @@ process_connect(
"protocol version: ~p, keepalive: ~p, property names: ~p",
[ClientId0, Username0, CleanStart, ProtoVer, KeepaliveSecs, maps:keys(ConnectProps)]),
SslLoginName = ssl_login_name(Socket),
Flow = case rabbit_misc:get_env(rabbit, classic_queue_flow_control, true) of
true -> flow;
false -> noflow
end,
MaxPacketSize = maps:get('Maximum-Packet-Size', ConnectProps, ?MAX_PACKET_SIZE),
TopicAliasMax = persistent_term:get(?PERSISTENT_TERM_TOPIC_ALIAS_MAXIMUM),
TopicAliasMaxOutbound = min(maps:get('Topic-Alias-Maximum', ConnectProps, 0), TopicAliasMax),
Expand Down Expand Up @@ -208,6 +213,7 @@ process_connect(
clean_start = CleanStart,
session_expiry_interval_secs = SessionExpiry,
ssl_login_name = SslLoginName,
delivery_flow = Flow,
trace_state = TraceState,
prefetch = prefetch(ConnectProps),
conn_name = ConnName,
Expand Down Expand Up @@ -1552,6 +1558,7 @@ publish_to_queues(
#mqtt_msg{topic = Topic,
packet_id = PacketId} = MqttMsg,
#state{cfg = #cfg{exchange = ExchangeName = #resource{name = ExchangeNameBin},
delivery_flow = Flow,
conn_name = ConnName,
trace_state = TraceState},
auth_state = #auth_state{user = #user{username = Username}}} = State) ->
Expand All @@ -1564,7 +1571,7 @@ publish_to_queues(
QNames0 = rabbit_exchange:route(Exchange, Msg, #{return_binding_keys => true}),
QNames = drop_local(QNames0, State),
rabbit_trace:tap_in(Msg, QNames, ConnName, Username, TraceState),
Opts = maps_put_truthy(correlation, PacketId, #{}),
Opts = maps_put_truthy(flow, Flow, maps_put_truthy(correlation, PacketId, #{})),
deliver_to_queues(Msg, Opts, QNames, State);
{error, not_found} ->
?LOG_ERROR("~s not found", [rabbit_misc:rs(ExchangeName)]),
Expand Down Expand Up @@ -2478,6 +2485,7 @@ format_status(
published = Published,
ssl_login_name = SSLLoginName,
retainer_pid = RetainerPid,
delivery_flow = DeliveryFlow,
trace_state = TraceState,
prefetch = Prefetch,
client_id = ClientID,
Expand All @@ -2499,6 +2507,7 @@ format_status(
ssl_login_name => SSLLoginName,
retainer_pid => RetainerPid,

delivery_flow => DeliveryFlow,
trace_state => TraceState,
prefetch => Prefetch,
client_id => ClientID,
Expand Down
19 changes: 19 additions & 0 deletions deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ cluster_size_3_tests() ->
pubsub,
queue_down_qos1,
consuming_classic_queue_down,
flow_classic_queue,
flow_quorum_queue,
flow_stream,
rabbit_mqtt_qos0_queue,
Expand Down Expand Up @@ -486,6 +487,24 @@ publish_to_all_non_deprecated_queue_types(Config, QoS) ->
?awaitMatch([],
all_connection_pids(Config), 10_000, 1000).

%% This test case does not require multiple nodes
%% but it is grouped together with flow test cases for other queue types
%% (and historically used to use a mirrored classic queue on multiple nodes)
flow_classic_queue(Config) ->
%% New nodes lookup via persistent_term:get/1 (since 4.0.0)
%% Old nodes lookup via application:get_env/2. (that is taken care of by flow/3)
%% Therefore, we set both persistent_term and application.
Key = credit_flow_default_credit,
Val = {2, 1},
DefaultVal = rabbit_ct_broker_helpers:rpc(Config, persistent_term, get, [Key]),
Result = rpc_all(Config, persistent_term, put, [Key, Val]),
?assert(lists:all(fun(R) -> R =:= ok end, Result)),

flow(Config, {rabbit, Key, Val}, <<"classic">>),

?assertEqual(Result, rpc_all(Config, persistent_term, put, [Key, DefaultVal])),
ok.

flow_quorum_queue(Config) ->
flow(Config, {rabbit, quorum_commands_soft_limit, 1}, <<"quorum">>).

Expand Down

0 comments on commit d65bd7d

Please sign in to comment.