Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QQ: introduce a default delivery limit #11937

Merged
merged 3 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -850,10 +850,9 @@ overview(#?STATE{consumers = Cons,
#{}
end,
MsgsRet = lqueue:len(Returns),

#{len := _MsgsLen,
num_hi := MsgsHi,
#{num_hi := MsgsHi,
num_lo := MsgsLo} = rabbit_fifo_q:overview(Messages),

Overview = #{type => ?STATE,
config => Conf,
num_consumers => map_size(Cons),
Expand Down
24 changes: 21 additions & 3 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@
-define(RA_SYSTEM, quorum_queues).
-define(RA_WAL_NAME, ra_log_wal).

-define(DEFAULT_DELIVERY_LIMIT, 20).

-define(INFO(Str, Args),
rabbit_log:info("[~s:~s/~b] " Str,
[?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY | Args])).
Expand Down Expand Up @@ -320,7 +322,14 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
OverflowBin = args_policy_lookup(<<"overflow">>, fun policyHasPrecedence/2, Q),
Overflow = overflow(OverflowBin, drop_head, QName),
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q),
DeliveryLimit = case args_policy_lookup(<<"delivery-limit">>, fun min/2, Q) of
undefined ->
rabbit_log:info("~ts: delivery_limit not set, defaulting to ~b",
[rabbit_misc:rs(QName), ?DEFAULT_DELIVERY_LIMIT]),
?DEFAULT_DELIVERY_LIMIT;
DL ->
DL
end,
Expires = args_policy_lookup(<<"expires">>, fun min/2, Q),
MsgTTL = args_policy_lookup(<<"message-ttl">>, fun min/2, Q),
#{name => Name,
Expand Down Expand Up @@ -508,11 +517,12 @@ spawn_notify_decorators(QName, Fun, Args) ->
catch notify_decorators(QName, Fun, Args).

handle_tick(QName,
#{config := #{name := Name},
#{config := #{name := Name} = Cfg,
num_active_consumers := NumConsumers,
num_checked_out := NumCheckedOut,
num_ready_messages := NumReadyMsgs,
num_messages := NumMessages,
num_enqueuers := NumEnqueuers,
enqueue_message_bytes := EnqueueBytes,
checkout_message_bytes := CheckoutBytes,
num_discarded := NumDiscarded,
Expand Down Expand Up @@ -559,6 +569,7 @@ handle_tick(QName,
MsgBytesDiscarded = DiscardBytes + DiscardCheckoutBytes,
MsgBytes = EnqueueBytes + CheckoutBytes + MsgBytesDiscarded,
Infos = [{consumers, NumConsumers},
{publishers, NumEnqueuers},
{consumer_capacity, Util},
{consumer_utilisation, Util},
{messages, NumMessages},
Expand All @@ -573,7 +584,14 @@ handle_tick(QName,
{message_bytes_dlx, MsgBytesDiscarded},
{single_active_consumer_tag, SacTag},
{single_active_consumer_pid, SacPid},
{leader, node()}
{leader, node()},
{delivery_limit, case maps:get(delivery_limit, Cfg,
undefined) of
undefined ->
unlimited;
Limit ->
Limit
end}
| Infos0],
rabbit_core_metrics:queue_stats(QName, Infos),
ok = repair_leader_record(Q, Self),
Expand Down
3 changes: 3 additions & 0 deletions deps/rabbitmq_management/priv/www/js/global.js
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ var HELP = {
'queue-dead-lettered':
'Applies to messages dead-lettered with dead-letter-strategy <code>at-least-once</code>.',

'queue-delivery-limit':
'The number of times a message can be returned to this queue before it is dead-lettered (if configured) or dropped.',

'queue-message-body-bytes':
'<p>The sum total of the sizes of the message bodies in this queue. This only counts message bodies; it does not include message properties (including headers) or metadata used by the queue.</p><p>Note that "in memory" and "persistent" are not mutually exclusive; persistent messages can be in memory as well as on disc, and transient messages can be paged out if memory is tight. Non-durable queues will consider all messages to be transient.</p><p>If a message is routed to multiple queues on publication, its body will be stored only once (in memory and on disk) and shared between queues. The value shown here does not take account of this effect.</p>',

Expand Down
22 changes: 18 additions & 4 deletions deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,29 @@
<td><%= fmt_string(queue.consumer_details.length) %></td>
</tr>
<% } %>
<% if (!is_stream(queue)) { %>
<% if (is_classic(queue)) { %>
<tr>
<th>Consumer capacity <span class="help" id="queue-consumer-capacity"></th>
<td><%= fmt_percent(queue.consumer_capacity) %></td>
</tr>
<% } %>
<% if(queue.hasOwnProperty('publishers')) { %>
<tr>
<th>Publishers</th>
<td><%= fmt_string(queue.publishers) %></td>
</tr>
<% } %>
<% if (is_quorum(queue)) { %>
<tr>
<th>Open files</th>
<td><%= fmt_table_short(queue.open_files) %></td>
</tr>
<% if (queue.hasOwnProperty('delivery_limit')) { %>
<tr>
<th>Delivery limit <span class="help" id="queue-delivery-limit"></th>
<td><%= fmt_string(queue.delivery_limit) %></td>
</tr>
<% } %>
<% } %>
<% if (is_stream(queue)) { %>
<tr>
Expand Down Expand Up @@ -187,20 +199,22 @@
<td class="r">
<%= fmt_bytes(queue.message_bytes_unacknowledged) %>
</td>
<td class="r">
<%= fmt_bytes(queue.message_bytes_ram) %>
</td>
<% } %>
<% if (is_quorum(queue)) { %>
<td class="r">
</td>
<td class="r">
</td>
<td class="r">
</td>
<td class="r">
<%= fmt_bytes(queue.message_bytes_dlx) %>
</td>
<% } %>
<% if (is_classic(queue)) { %>
<td class="r">
<%= fmt_bytes(queue.message_bytes_ram) %>
</td>
<td class="r">
<%= fmt_bytes(queue.message_bytes_persistent) %>
</td>
Expand Down
6 changes: 5 additions & 1 deletion release-notes/4.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ See Compatibility Notes below to learn about **breaking or potentially breaking
* RabbitMQ 3.13 `rabbitmq.conf` setting `rabbitmq_amqp1_0.default_vhost` is unsupported in RabbitMQ 4.0.
Instead `default_vhost` will be used to determine the default vhost an AMQP 1.0 client connects to(i.e. when the AMQP 1.0 client does not define the vhost in the `hostname` field of the `open` frame)
* RabbitMQ Shovels will be able connect to a RabbitMQ 4.0 node via AMQP 1.0 only when the Shovel runs on a RabbitMQ node >= `3.13.7`
* Quorum queues will now always set a default `delivery-limit` of 20 which can be increased or decreased by policies and queue arguments but cannot be unset. Some applications or configurations may need to be updated to handle this.

## Erlang/OTP Compatibility Notes

Expand Down Expand Up @@ -83,8 +84,11 @@ periods of time (no more than a few hours).

### Recommended Post-upgrade Procedures

TBD
Set a low priority dead lettering policy for all quorum queues to dead letter to a stream or similar
so that messages that reach the new default delivery limit of 20 aren't lost completely
when no dead lettering policy is in place.

TBD

## Changes Worth Mentioning

Expand Down
Loading