From d65bd7d07a48b6122256e62349e8fe441e488f32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20G=C3=B6m=C3=B6ri?= Date: Tue, 3 Dec 2024 17:34:32 +0100 Subject: [PATCH] Restore credit_flow between channel/MQTT connection -> CQ processes The credit_flow between publishing AMQP 0.9.1 channel (or MQTT connection) and (non-mirrored) classic queue processes was unintentionally removed in 4.0 together with anything else related to CQ mirroring. By default we restore the 3.x behaviour for non-mirored classic queues. It is possible to disable flow-control (the earlier 4.0.x behaviour) with the new env `classic_queue_flow_control`. In 3.x this was possible with the config `mirroring_flow_control`. --- deps/rabbit/BUILD.bazel | 2 + deps/rabbit/Makefile | 2 + deps/rabbit/src/rabbit_channel.erl | 8 +- deps/rabbit/src/rabbit_classic_queue.erl | 2 +- deps/rabbit/test/classic_queue_SUITE.erl | 122 +++++++++++++++++- .../src/rabbit_mqtt_processor.erl | 11 +- deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl | 19 +++ 7 files changed, 162 insertions(+), 4 deletions(-) diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 76be5953a6c3..68d5f16da884 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -85,6 +85,8 @@ _APP_ENV = """[ {exit_on_close, false} ]}, {ssl_apps, [asn1, crypto, public_key, ssl]}, + %% see rabbitmq-server#114 + {classic_queue_flow_control, true}, %% see rabbitmq-server#227 and related tickets. %% msg_store_credit_disc_bound only takes effect when %% messages are persisted to the message store. If messages diff --git a/deps/rabbit/Makefile b/deps/rabbit/Makefile index f494ee760ac0..a720a36fceff 100644 --- a/deps/rabbit/Makefile +++ b/deps/rabbit/Makefile @@ -68,6 +68,8 @@ define PROJECT_ENV {exit_on_close, false} ]}, {ssl_apps, [asn1, crypto, public_key, ssl]}, + %% see rabbitmq-server#114 + {classic_queue_flow_control, true}, %% see rabbitmq-server#227 and related tickets. %% msg_store_credit_disc_bound only takes effect when %% messages are persisted to the message store. If messages diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 0d7bd5bf45d7..9ca11e4dea15 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -159,7 +159,8 @@ rejected, %% used by "one shot RPC" (amq. reply_consumer :: none | {rabbit_types:ctag(), binary(), binary()}, - delivery_flow, %% Deprecated since removal of CMQ in 4.0 + %% see rabbitmq-server#114 + delivery_flow :: flow | noflow, interceptor_state, queue_states, tick_timer, @@ -489,6 +490,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, ?LG_PROCESS_TYPE(channel), ?store_proc_name({ConnName, Channel}), ok = pg_local:join(rabbit_channels, self()), + Flow = case rabbit_misc:get_env(rabbit, classic_queue_flow_control, true) of + true -> flow; + false -> noflow + end, {ok, {Global0, Prefetch}} = application:get_env(rabbit, default_consumer_prefetch), Limiter0 = rabbit_limiter:new(LimiterPid), Global = Global0 andalso is_global_qos_permitted(), @@ -537,6 +542,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, rejected = [], confirmed = [], reply_consumer = none, + delivery_flow = Flow, interceptor_state = undefined, queue_states = rabbit_queue_type:init() }, diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index b7ed084ac0a3..0f92f863bf6f 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -449,7 +449,7 @@ deliver(Qs0, Msg0, Options) -> Confirm = MsgSeqNo /= undefined, {MPids, Qs} = qpids(Qs0, Confirm, MsgSeqNo), - Delivery = rabbit_basic:delivery(Mandatory, Confirm, Msg, MsgSeqNo), + Delivery = rabbit_basic:delivery(Mandatory, Confirm, Msg, MsgSeqNo, Flow), case Flow of %% Here we are tracking messages sent by the rabbit_channel diff --git a/deps/rabbit/test/classic_queue_SUITE.erl b/deps/rabbit/test/classic_queue_SUITE.erl index 5b54d7150fb0..bfa626a38255 100644 --- a/deps/rabbit/test/classic_queue_SUITE.erl +++ b/deps/rabbit/test/classic_queue_SUITE.erl @@ -8,6 +8,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -compile([nowarn_export_all, export_all]). @@ -18,11 +19,17 @@ all() -> [ + {group, cluster_size_1}, {group, cluster_size_3} ]. groups() -> [ + {cluster_size_1, [], [ + classic_queue_flow_control_enabled, + classic_queue_flow_control_disabled + ] + }, {cluster_size_3, [], [ leader_locator_client_local, leader_locator_balanced, @@ -42,10 +49,14 @@ end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). init_per_group(Group, Config) -> + Nodes = case Group of + cluster_size_1 -> 1; + cluster_size_3 -> 3 + end, Config1 = rabbit_ct_helpers:set_config(Config, [ {rmq_nodename_suffix, Group}, - {rmq_nodes_count, 3}, + {rmq_nodes_count, Nodes}, {rmq_nodes_clustered, true}, {tcp_ports_base, {skip_n_nodes, 3}} ]), @@ -72,6 +83,67 @@ init_per_testcase(T, Config) -> %% Testcases. %% ------------------------------------------------------------------- +classic_queue_flow_control_enabled(Config) -> + FlowEnabled = true, + VerifyFun = + fun(QPid, ConnPid) -> + %% Only 2+2 messages reach the message queue of the classic queue. + %% (before the credits of the connection and channel processes run out) + ?awaitMatch(4, proc_info(QPid, message_queue_len), 1000), + ?assertMatch({0, _}, gen_server2_queue(QPid)), + + %% The connection gets into flow state + ?assertEqual([{state, flow}], rabbit_reader:info(ConnPid, [state])), + + Dict = proc_info(ConnPid, dictionary), + ?assertMatch([_|_], proplists:get_value(credit_blocked, Dict)), + ok + end, + flow_control(Config, FlowEnabled, VerifyFun). + +classic_queue_flow_control_disabled(Config) -> + FlowEnabled = false, + VerifyFun = + fun(QPid, ConnPid) -> + %% All published messages will end up in the message + %% queue of the suspended classic queue process + ?awaitMatch(100, proc_info(QPid, message_queue_len), 1000), + ?assertMatch({0, _}, gen_server2_queue(QPid)), + + %% The connection dos not get into flow state + ?assertEqual([{state, running}], rabbit_reader:info(ConnPid, [state])), + + Dict = proc_info(ConnPid, dictionary), + ?assertMatch([], proplists:get_value(credit_blocked, Dict, [])) + end, + flow_control(Config, FlowEnabled, VerifyFun). + +flow_control(Config, FlowEnabled, VerifyFun) -> + OrigCredit = set_default_credit(Config, {2, 1}), + OrigFlow = set_flow_control(Config, FlowEnabled), + + Ch = rabbit_ct_client_helpers:open_channel(Config), + QueueName = atom_to_binary(?FUNCTION_NAME), + declare(Ch, QueueName, [{<<"x-queue-type">>, longstr, <<"classic">>}]), + QPid = get_queue_pid(Config, QueueName), + try + sys:suspend(QPid), + + %% Publish 100 messages without publisher confirms + publish_many(Ch, QueueName, 100), + + [ConnPid] = rabbit_ct_broker_helpers:rpc(Config, rabbit_networking, local_connections, []), + + VerifyFun(QPid, ConnPid), + ok + after + sys:resume(QPid), + delete_queues(Ch, [QueueName]), + set_default_credit(Config, OrigCredit), + set_flow_control(Config, OrigFlow), + rabbit_ct_client_helpers:close_channel(Ch) + end. + leader_locator_client_local(Config) -> Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Q = <<"q1">>, @@ -129,7 +201,55 @@ declare(Ch, Q, Args) -> auto_delete = false, arguments = Args}). +delete_queues(Ch, Qs) -> + [?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})) + || Q <- Qs]. + delete_queues() -> [rabbit_amqqueue:delete(Q, false, false, <<"dummy">>) || Q <- rabbit_amqqueue:list()]. + +publish(Ch, QName, Payload) -> + amqp_channel:cast(Ch, + #'basic.publish'{exchange = <<>>, + routing_key = QName}, + #amqp_msg{payload = Payload}). + +publish_many(Ch, QName, Count) -> + [publish(Ch, QName, integer_to_binary(I)) + || I <- lists:seq(1, Count)]. + +proc_info(Pid, Info) -> + case rabbit_misc:process_info(Pid, Info) of + {Info, Value} -> + Value; + Error -> + {error, Error} + end. + +gen_server2_queue(Pid) -> + Status = sys:get_status(Pid), + {status, Pid,_Mod, + [_Dict, _SysStatus, _Parent, _Dbg, + [{header, _}, + {data, Data}|_]]} = Status, + proplists:get_value("Queued messages", Data). + +set_default_credit(Config, Value) -> + Key = credit_flow_default_credit, + OrigValue = rabbit_ct_broker_helpers:rpc(Config, persistent_term, get, [Key]), + ok = rabbit_ct_broker_helpers:rpc(Config, persistent_term, put, [Key, Value]), + OrigValue. + +set_flow_control(Config, Value) when is_boolean(Value) -> + Key = classic_queue_flow_control, + {ok, OrigValue} = rabbit_ct_broker_helpers:rpc(Config, application, get_env, [rabbit, Key]), + rabbit_ct_broker_helpers:rpc(Config, application, set_env, [rabbit, Key, Value]), + OrigValue. + +get_queue_pid(Config, QueueName) -> + {ok, QRec} = rabbit_ct_broker_helpers:rpc( + Config, 0, rabbit_amqqueue, lookup, [QueueName, <<"/">>]), + amqqueue:get_pid(QRec). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index c45f894c85e8..499790b30bf1 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -72,6 +72,7 @@ published = false :: boolean(), ssl_login_name :: none | binary(), retainer_pid :: pid(), + delivery_flow :: flow | noflow, trace_state :: rabbit_trace:state(), prefetch :: non_neg_integer(), vhost :: rabbit_types:vhost(), @@ -148,6 +149,10 @@ process_connect( "protocol version: ~p, keepalive: ~p, property names: ~p", [ClientId0, Username0, CleanStart, ProtoVer, KeepaliveSecs, maps:keys(ConnectProps)]), SslLoginName = ssl_login_name(Socket), + Flow = case rabbit_misc:get_env(rabbit, classic_queue_flow_control, true) of + true -> flow; + false -> noflow + end, MaxPacketSize = maps:get('Maximum-Packet-Size', ConnectProps, ?MAX_PACKET_SIZE), TopicAliasMax = persistent_term:get(?PERSISTENT_TERM_TOPIC_ALIAS_MAXIMUM), TopicAliasMaxOutbound = min(maps:get('Topic-Alias-Maximum', ConnectProps, 0), TopicAliasMax), @@ -208,6 +213,7 @@ process_connect( clean_start = CleanStart, session_expiry_interval_secs = SessionExpiry, ssl_login_name = SslLoginName, + delivery_flow = Flow, trace_state = TraceState, prefetch = prefetch(ConnectProps), conn_name = ConnName, @@ -1552,6 +1558,7 @@ publish_to_queues( #mqtt_msg{topic = Topic, packet_id = PacketId} = MqttMsg, #state{cfg = #cfg{exchange = ExchangeName = #resource{name = ExchangeNameBin}, + delivery_flow = Flow, conn_name = ConnName, trace_state = TraceState}, auth_state = #auth_state{user = #user{username = Username}}} = State) -> @@ -1564,7 +1571,7 @@ publish_to_queues( QNames0 = rabbit_exchange:route(Exchange, Msg, #{return_binding_keys => true}), QNames = drop_local(QNames0, State), rabbit_trace:tap_in(Msg, QNames, ConnName, Username, TraceState), - Opts = maps_put_truthy(correlation, PacketId, #{}), + Opts = maps_put_truthy(flow, Flow, maps_put_truthy(correlation, PacketId, #{})), deliver_to_queues(Msg, Opts, QNames, State); {error, not_found} -> ?LOG_ERROR("~s not found", [rabbit_misc:rs(ExchangeName)]), @@ -2478,6 +2485,7 @@ format_status( published = Published, ssl_login_name = SSLLoginName, retainer_pid = RetainerPid, + delivery_flow = DeliveryFlow, trace_state = TraceState, prefetch = Prefetch, client_id = ClientID, @@ -2499,6 +2507,7 @@ format_status( ssl_login_name => SSLLoginName, retainer_pid => RetainerPid, + delivery_flow => DeliveryFlow, trace_state => TraceState, prefetch => Prefetch, client_id => ClientID, diff --git a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl index 64c377c8af9a..32aa231c9edf 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl @@ -131,6 +131,7 @@ cluster_size_3_tests() -> pubsub, queue_down_qos1, consuming_classic_queue_down, + flow_classic_queue, flow_quorum_queue, flow_stream, rabbit_mqtt_qos0_queue, @@ -486,6 +487,24 @@ publish_to_all_non_deprecated_queue_types(Config, QoS) -> ?awaitMatch([], all_connection_pids(Config), 10_000, 1000). +%% This test case does not require multiple nodes +%% but it is grouped together with flow test cases for other queue types +%% (and historically used to use a mirrored classic queue on multiple nodes) +flow_classic_queue(Config) -> + %% New nodes lookup via persistent_term:get/1 (since 4.0.0) + %% Old nodes lookup via application:get_env/2. (that is taken care of by flow/3) + %% Therefore, we set both persistent_term and application. + Key = credit_flow_default_credit, + Val = {2, 1}, + DefaultVal = rabbit_ct_broker_helpers:rpc(Config, persistent_term, get, [Key]), + Result = rpc_all(Config, persistent_term, put, [Key, Val]), + ?assert(lists:all(fun(R) -> R =:= ok end, Result)), + + flow(Config, {rabbit, Key, Val}, <<"classic">>), + + ?assertEqual(Result, rpc_all(Config, persistent_term, put, [Key, DefaultVal])), + ok. + flow_quorum_queue(Config) -> flow(Config, {rabbit, quorum_commands_soft_limit, 1}, <<"quorum">>).