Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore credit_flow between AMQP 0.9.1 channel/MQTT connection -> CQ processes #12906

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ _APP_ENV = """[
{exit_on_close, false}
]},
{ssl_apps, [asn1, crypto, public_key, ssl]},
%% see rabbitmq-server#114
{classic_queue_flow_control, true},
%% see rabbitmq-server#227 and related tickets.
%% msg_store_credit_disc_bound only takes effect when
%% messages are persisted to the message store. If messages
Expand Down
2 changes: 2 additions & 0 deletions deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ define PROJECT_ENV
{exit_on_close, false}
]},
{ssl_apps, [asn1, crypto, public_key, ssl]},
%% see rabbitmq-server#114
{classic_queue_flow_control, true},
%% see rabbitmq-server#227 and related tickets.
%% msg_store_credit_disc_bound only takes effect when
%% messages are persisted to the message store. If messages
Expand Down
8 changes: 7 additions & 1 deletion deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@
rejected,
%% used by "one shot RPC" (amq.
reply_consumer :: none | {rabbit_types:ctag(), binary(), binary()},
delivery_flow, %% Deprecated since removal of CMQ in 4.0
%% see rabbitmq-server#114
delivery_flow :: flow | noflow,
interceptor_state,
queue_states,
tick_timer,
Expand Down Expand Up @@ -489,6 +490,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
?LG_PROCESS_TYPE(channel),
?store_proc_name({ConnName, Channel}),
ok = pg_local:join(rabbit_channels, self()),
Flow = case rabbit_misc:get_env(rabbit, classic_queue_flow_control, true) of
true -> flow;
false -> noflow
end,
{ok, {Global0, Prefetch}} = application:get_env(rabbit, default_consumer_prefetch),
Limiter0 = rabbit_limiter:new(LimiterPid),
Global = Global0 andalso is_global_qos_permitted(),
Expand Down Expand Up @@ -537,6 +542,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
rejected = [],
confirmed = [],
reply_consumer = none,
delivery_flow = Flow,
interceptor_state = undefined,
queue_states = rabbit_queue_type:init()
},
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_classic_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ deliver(Qs0, Msg0, Options) ->
Confirm = MsgSeqNo /= undefined,

{MPids, Qs} = qpids(Qs0, Confirm, MsgSeqNo),
Delivery = rabbit_basic:delivery(Mandatory, Confirm, Msg, MsgSeqNo),
Delivery = rabbit_basic:delivery(Mandatory, Confirm, Msg, MsgSeqNo, Flow),

case Flow of
%% Here we are tracking messages sent by the rabbit_channel
Expand Down
122 changes: 121 additions & 1 deletion deps/rabbit/test/classic_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").

-compile([nowarn_export_all, export_all]).

Expand All @@ -18,11 +19,17 @@

all() ->
[
{group, cluster_size_1},
{group, cluster_size_3}
].

groups() ->
[
{cluster_size_1, [], [
classic_queue_flow_control_enabled,
classic_queue_flow_control_disabled
]
},
{cluster_size_3, [], [
leader_locator_client_local,
leader_locator_balanced,
Expand All @@ -42,10 +49,14 @@ end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).

init_per_group(Group, Config) ->
Nodes = case Group of
cluster_size_1 -> 1;
cluster_size_3 -> 3
end,
Config1 = rabbit_ct_helpers:set_config(Config,
[
{rmq_nodename_suffix, Group},
{rmq_nodes_count, 3},
{rmq_nodes_count, Nodes},
{rmq_nodes_clustered, true},
{tcp_ports_base, {skip_n_nodes, 3}}
]),
Expand All @@ -72,6 +83,67 @@ init_per_testcase(T, Config) ->
%% Testcases.
%% -------------------------------------------------------------------

classic_queue_flow_control_enabled(Config) ->
FlowEnabled = true,
VerifyFun =
fun(QPid, ConnPid) ->
%% Only 2+2 messages reach the message queue of the classic queue.
%% (before the credits of the connection and channel processes run out)
?awaitMatch(4, proc_info(QPid, message_queue_len), 1000),
?assertMatch({0, _}, gen_server2_queue(QPid)),

%% The connection gets into flow state
?assertEqual([{state, flow}], rabbit_reader:info(ConnPid, [state])),

Dict = proc_info(ConnPid, dictionary),
?assertMatch([_|_], proplists:get_value(credit_blocked, Dict)),
ok
end,
flow_control(Config, FlowEnabled, VerifyFun).

classic_queue_flow_control_disabled(Config) ->
FlowEnabled = false,
VerifyFun =
fun(QPid, ConnPid) ->
%% All published messages will end up in the message
%% queue of the suspended classic queue process
?awaitMatch(100, proc_info(QPid, message_queue_len), 1000),
?assertMatch({0, _}, gen_server2_queue(QPid)),

%% The connection dos not get into flow state
?assertEqual([{state, running}], rabbit_reader:info(ConnPid, [state])),

Dict = proc_info(ConnPid, dictionary),
?assertMatch([], proplists:get_value(credit_blocked, Dict, []))
end,
flow_control(Config, FlowEnabled, VerifyFun).

flow_control(Config, FlowEnabled, VerifyFun) ->
OrigCredit = set_default_credit(Config, {2, 1}),
OrigFlow = set_flow_control(Config, FlowEnabled),

Ch = rabbit_ct_client_helpers:open_channel(Config),
QueueName = atom_to_binary(?FUNCTION_NAME),
declare(Ch, QueueName, [{<<"x-queue-type">>, longstr, <<"classic">>}]),
QPid = get_queue_pid(Config, QueueName),
try
sys:suspend(QPid),

%% Publish 100 messages without publisher confirms
publish_many(Ch, QueueName, 100),

[ConnPid] = rabbit_ct_broker_helpers:rpc(Config, rabbit_networking, local_connections, []),

VerifyFun(QPid, ConnPid),
ok
after
sys:resume(QPid),
delete_queues(Ch, [QueueName]),
set_default_credit(Config, OrigCredit),
set_flow_control(Config, OrigFlow),
rabbit_ct_client_helpers:close_channel(Ch)
end.

leader_locator_client_local(Config) ->
Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Q = <<"q1">>,
Expand Down Expand Up @@ -129,7 +201,55 @@ declare(Ch, Q, Args) ->
auto_delete = false,
arguments = Args}).

delete_queues(Ch, Qs) ->
[?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch, #'queue.delete'{queue = Q}))
|| Q <- Qs].

delete_queues() ->
[rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
|| Q <- rabbit_amqqueue:list()].


publish(Ch, QName, Payload) ->
amqp_channel:cast(Ch,
#'basic.publish'{exchange = <<>>,
routing_key = QName},
#amqp_msg{payload = Payload}).

publish_many(Ch, QName, Count) ->
[publish(Ch, QName, integer_to_binary(I))
|| I <- lists:seq(1, Count)].

proc_info(Pid, Info) ->
case rabbit_misc:process_info(Pid, Info) of
{Info, Value} ->
Value;
Error ->
{error, Error}
end.

gen_server2_queue(Pid) ->
Status = sys:get_status(Pid),
{status, Pid,_Mod,
[_Dict, _SysStatus, _Parent, _Dbg,
[{header, _},
{data, Data}|_]]} = Status,
proplists:get_value("Queued messages", Data).

set_default_credit(Config, Value) ->
Key = credit_flow_default_credit,
OrigValue = rabbit_ct_broker_helpers:rpc(Config, persistent_term, get, [Key]),
ok = rabbit_ct_broker_helpers:rpc(Config, persistent_term, put, [Key, Value]),
OrigValue.

set_flow_control(Config, Value) when is_boolean(Value) ->
Key = classic_queue_flow_control,
{ok, OrigValue} = rabbit_ct_broker_helpers:rpc(Config, application, get_env, [rabbit, Key]),
rabbit_ct_broker_helpers:rpc(Config, application, set_env, [rabbit, Key, Value]),
OrigValue.

get_queue_pid(Config, QueueName) ->
{ok, QRec} = rabbit_ct_broker_helpers:rpc(
Config, 0, rabbit_amqqueue, lookup, [QueueName, <<"/">>]),
amqqueue:get_pid(QRec).
20 changes: 19 additions & 1 deletion deps/rabbit_common/src/rabbit_misc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
build_acyclic_graph/3]).
-export([const/1]).
-export([ntoa/1, ntoab/1]).
-export([is_process_alive/1]).
-export([is_process_alive/1,
process_info/2]).
-export([pget/2, pget/3, pupdate/3, pget_or_die/2, pmerge/3, pset/3, plmerge/2]).
-export([deep_pget/2, deep_pget/3]).
-export([format_message_queue/2]).
Expand Down Expand Up @@ -812,6 +813,23 @@ is_process_alive(Pid) ->
lists:member(Node, [node() | nodes(connected)]) andalso
rpc:call(Node, erlang, is_process_alive, [Pid]) =:= true.

%% Get process info of a prossibly remote process.
%% We try to avoid reconnecting to down nodes.
-spec process_info(pid(), ItemSpec) -> Result| undefined | {badrpc, term()}
when
ItemSpec :: atom() | list() | tuple(),
Result :: {atom() | tuple(), term()} | [{atom() | tuple(), term()}].
process_info(Pid, Items) when node(Pid) =:= node() ->
erlang:process_info(Pid, Items);
process_info(Pid, Items) ->
Node = node(Pid),
case lists:member(Node, [node() | nodes(connected)]) of
true ->
rpc:call(Node, erlang, process_info, [Pid, Items]);
_ ->
{badrpc, nodedown}
end.

-spec pget(term(), list() | map()) -> term().
pget(K, M) when is_map(M) ->
maps:get(K, M, undefined);
Expand Down
11 changes: 10 additions & 1 deletion deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
published = false :: boolean(),
ssl_login_name :: none | binary(),
retainer_pid :: pid(),
delivery_flow :: flow | noflow,
trace_state :: rabbit_trace:state(),
prefetch :: non_neg_integer(),
vhost :: rabbit_types:vhost(),
Expand Down Expand Up @@ -148,6 +149,10 @@ process_connect(
"protocol version: ~p, keepalive: ~p, property names: ~p",
[ClientId0, Username0, CleanStart, ProtoVer, KeepaliveSecs, maps:keys(ConnectProps)]),
SslLoginName = ssl_login_name(Socket),
Flow = case rabbit_misc:get_env(rabbit, classic_queue_flow_control, true) of
true -> flow;
false -> noflow
end,
MaxPacketSize = maps:get('Maximum-Packet-Size', ConnectProps, ?MAX_PACKET_SIZE),
TopicAliasMax = persistent_term:get(?PERSISTENT_TERM_TOPIC_ALIAS_MAXIMUM),
TopicAliasMaxOutbound = min(maps:get('Topic-Alias-Maximum', ConnectProps, 0), TopicAliasMax),
Expand Down Expand Up @@ -208,6 +213,7 @@ process_connect(
clean_start = CleanStart,
session_expiry_interval_secs = SessionExpiry,
ssl_login_name = SslLoginName,
delivery_flow = Flow,
trace_state = TraceState,
prefetch = prefetch(ConnectProps),
conn_name = ConnName,
Expand Down Expand Up @@ -1552,6 +1558,7 @@ publish_to_queues(
#mqtt_msg{topic = Topic,
packet_id = PacketId} = MqttMsg,
#state{cfg = #cfg{exchange = ExchangeName = #resource{name = ExchangeNameBin},
delivery_flow = Flow,
conn_name = ConnName,
trace_state = TraceState},
auth_state = #auth_state{user = #user{username = Username}}} = State) ->
Expand All @@ -1564,7 +1571,7 @@ publish_to_queues(
QNames0 = rabbit_exchange:route(Exchange, Msg, #{return_binding_keys => true}),
QNames = drop_local(QNames0, State),
rabbit_trace:tap_in(Msg, QNames, ConnName, Username, TraceState),
Opts = maps_put_truthy(correlation, PacketId, #{}),
Opts = maps_put_truthy(flow, Flow, maps_put_truthy(correlation, PacketId, #{})),
deliver_to_queues(Msg, Opts, QNames, State);
{error, not_found} ->
?LOG_ERROR("~s not found", [rabbit_misc:rs(ExchangeName)]),
Expand Down Expand Up @@ -2478,6 +2485,7 @@ format_status(
published = Published,
ssl_login_name = SSLLoginName,
retainer_pid = RetainerPid,
delivery_flow = DeliveryFlow,
trace_state = TraceState,
prefetch = Prefetch,
client_id = ClientID,
Expand All @@ -2499,6 +2507,7 @@ format_status(
ssl_login_name => SSLLoginName,
retainer_pid => RetainerPid,

delivery_flow => DeliveryFlow,
trace_state => TraceState,
prefetch => Prefetch,
client_id => ClientID,
Expand Down
19 changes: 19 additions & 0 deletions deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ cluster_size_3_tests() ->
pubsub,
queue_down_qos1,
consuming_classic_queue_down,
flow_classic_queue,
flow_quorum_queue,
flow_stream,
rabbit_mqtt_qos0_queue,
Expand Down Expand Up @@ -486,6 +487,24 @@ publish_to_all_non_deprecated_queue_types(Config, QoS) ->
?awaitMatch([],
all_connection_pids(Config), 10_000, 1000).

%% This test case does not require multiple nodes
%% but it is grouped together with flow test cases for other queue types
%% (and historically used to use a mirrored classic queue on multiple nodes)
flow_classic_queue(Config) ->
%% New nodes lookup via persistent_term:get/1 (since 4.0.0)
%% Old nodes lookup via application:get_env/2. (that is taken care of by flow/3)
%% Therefore, we set both persistent_term and application.
Key = credit_flow_default_credit,
Val = {2, 1},
DefaultVal = rabbit_ct_broker_helpers:rpc(Config, persistent_term, get, [Key]),
Result = rpc_all(Config, persistent_term, put, [Key, Val]),
?assert(lists:all(fun(R) -> R =:= ok end, Result)),

flow(Config, {rabbit, Key, Val}, <<"classic">>),

?assertEqual(Result, rpc_all(Config, persistent_term, put, [Key, DefaultVal])),
ok.

flow_quorum_queue(Config) ->
flow(Config, {rabbit, quorum_commands_soft_limit, 1}, <<"quorum">>).

Expand Down
Loading