diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 04c11c2db587..0289bf9418f1 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -850,10 +850,9 @@ overview(#?STATE{consumers = Cons, #{} end, MsgsRet = lqueue:len(Returns), - - #{len := _MsgsLen, - num_hi := MsgsHi, + #{num_hi := MsgsHi, num_lo := MsgsLo} = rabbit_fifo_q:overview(Messages), + Overview = #{type => ?STATE, config => Conf, num_consumers => map_size(Cons), diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index fd91cde0e8c8..9f5d66faed6f 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -103,6 +103,8 @@ -define(RA_SYSTEM, quorum_queues). -define(RA_WAL_NAME, ra_log_wal). +-define(DEFAULT_DELIVERY_LIMIT, 20). + -define(INFO(Str, Args), rabbit_log:info("[~s:~s/~b] " Str, [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY | Args])). @@ -320,7 +322,14 @@ ra_machine_config(Q) when ?is_amqqueue(Q) -> OverflowBin = args_policy_lookup(<<"overflow">>, fun policyHasPrecedence/2, Q), Overflow = overflow(OverflowBin, drop_head, QName), MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q), - DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q), + DeliveryLimit = case args_policy_lookup(<<"delivery-limit">>, fun min/2, Q) of + undefined -> + rabbit_log:info("~ts: delivery_limit not set, defaulting to ~b", + [rabbit_misc:rs(QName), ?DEFAULT_DELIVERY_LIMIT]), + ?DEFAULT_DELIVERY_LIMIT; + DL -> + DL + end, Expires = args_policy_lookup(<<"expires">>, fun min/2, Q), MsgTTL = args_policy_lookup(<<"message-ttl">>, fun min/2, Q), #{name => Name, @@ -508,11 +517,12 @@ spawn_notify_decorators(QName, Fun, Args) -> catch notify_decorators(QName, Fun, Args). handle_tick(QName, - #{config := #{name := Name}, + #{config := #{name := Name} = Cfg, num_active_consumers := NumConsumers, num_checked_out := NumCheckedOut, num_ready_messages := NumReadyMsgs, num_messages := NumMessages, + num_enqueuers := NumEnqueuers, enqueue_message_bytes := EnqueueBytes, checkout_message_bytes := CheckoutBytes, num_discarded := NumDiscarded, @@ -559,6 +569,7 @@ handle_tick(QName, MsgBytesDiscarded = DiscardBytes + DiscardCheckoutBytes, MsgBytes = EnqueueBytes + CheckoutBytes + MsgBytesDiscarded, Infos = [{consumers, NumConsumers}, + {publishers, NumEnqueuers}, {consumer_capacity, Util}, {consumer_utilisation, Util}, {messages, NumMessages}, @@ -573,7 +584,14 @@ handle_tick(QName, {message_bytes_dlx, MsgBytesDiscarded}, {single_active_consumer_tag, SacTag}, {single_active_consumer_pid, SacPid}, - {leader, node()} + {leader, node()}, + {delivery_limit, case maps:get(delivery_limit, Cfg, + undefined) of + undefined -> + unlimited; + Limit -> + Limit + end} | Infos0], rabbit_core_metrics:queue_stats(QName, Infos), ok = repair_leader_record(Q, Self), diff --git a/deps/rabbitmq_management/priv/www/js/global.js b/deps/rabbitmq_management/priv/www/js/global.js index 2b92175742b1..7ad667e25302 100644 --- a/deps/rabbitmq_management/priv/www/js/global.js +++ b/deps/rabbitmq_management/priv/www/js/global.js @@ -256,6 +256,9 @@ var HELP = { 'queue-dead-lettered': 'Applies to messages dead-lettered with dead-letter-strategy at-least-once.', + 'queue-delivery-limit': + 'The number of times a message can be returned to this queue before it is dead-lettered (if configured) or dropped.', + 'queue-message-body-bytes': '

The sum total of the sizes of the message bodies in this queue. This only counts message bodies; it does not include message properties (including headers) or metadata used by the queue.

Note that "in memory" and "persistent" are not mutually exclusive; persistent messages can be in memory as well as on disc, and transient messages can be paged out if memory is tight. Non-durable queues will consider all messages to be transient.

If a message is routed to multiple queues on publication, its body will be stored only once (in memory and on disk) and shared between queues. The value shown here does not take account of this effect.

', diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs index ea141f0256bf..e027b32c2c81 100644 --- a/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs +++ b/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs @@ -92,17 +92,29 @@ <%= fmt_string(queue.consumer_details.length) %> <% } %> - <% if (!is_stream(queue)) { %> + <% if (is_classic(queue)) { %> Consumer capacity <%= fmt_percent(queue.consumer_capacity) %> <% } %> + <% if(queue.hasOwnProperty('publishers')) { %> + + Publishers + <%= fmt_string(queue.publishers) %> + + <% } %> <% if (is_quorum(queue)) { %> Open files <%= fmt_table_short(queue.open_files) %> + <% if (queue.hasOwnProperty('delivery_limit')) { %> + + Delivery limit + <%= fmt_string(queue.delivery_limit) %> + + <% } %> <% } %> <% if (is_stream(queue)) { %> @@ -187,20 +199,22 @@ <%= fmt_bytes(queue.message_bytes_unacknowledged) %> - - <%= fmt_bytes(queue.message_bytes_ram) %> - <% } %> <% if (is_quorum(queue)) { %> + + <%= fmt_bytes(queue.message_bytes_dlx) %> <% } %> <% if (is_classic(queue)) { %> + + <%= fmt_bytes(queue.message_bytes_ram) %> + <%= fmt_bytes(queue.message_bytes_persistent) %> diff --git a/release-notes/4.0.0.md b/release-notes/4.0.0.md index a379c799ffda..e551186bf276 100644 --- a/release-notes/4.0.0.md +++ b/release-notes/4.0.0.md @@ -33,6 +33,7 @@ See Compatibility Notes below to learn about **breaking or potentially breaking * RabbitMQ 3.13 `rabbitmq.conf` setting `rabbitmq_amqp1_0.default_vhost` is unsupported in RabbitMQ 4.0. Instead `default_vhost` will be used to determine the default vhost an AMQP 1.0 client connects to(i.e. when the AMQP 1.0 client does not define the vhost in the `hostname` field of the `open` frame) * RabbitMQ Shovels will be able connect to a RabbitMQ 4.0 node via AMQP 1.0 only when the Shovel runs on a RabbitMQ node >= `3.13.7` +* Quorum queues will now always set a default `delivery-limit` of 20 which can be increased or decreased by policies and queue arguments but cannot be unset. Some applications or configurations may need to be updated to handle this. ## Erlang/OTP Compatibility Notes @@ -83,8 +84,11 @@ periods of time (no more than a few hours). ### Recommended Post-upgrade Procedures -TBD +Set a low priority dead lettering policy for all quorum queues to dead letter to a stream or similar +so that messages that reach the new default delivery limit of 20 aren't lost completely +when no dead lettering policy is in place. +TBD ## Changes Worth Mentioning