Skip to content

Commit

Permalink
Merge pull request #12061 from rabbitmq/qq-prio
Browse files Browse the repository at this point in the history
Rename quorum queue priority from "low" to "normal"
  • Loading branch information
kjnilsson authored Aug 20, 2024
2 parents bd24b07 + 1c6f4be commit 4a3fa4d
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 42 deletions.
10 changes: 5 additions & 5 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ apply(#{index := Idx} = Meta,
credit = increase_credit(Con0, 1)},
State1 = State0#?STATE{ra_indexes = rabbit_fifo_index:delete(OldIdx,
Indexes0),
messages = rabbit_fifo_q:in(lo,
messages = rabbit_fifo_q:in(no,
?MSG(Idx, Header),
Messages),
enqueue_count = EnqCount + 1},
Expand Down Expand Up @@ -851,7 +851,7 @@ overview(#?STATE{consumers = Cons,
end,
MsgsRet = lqueue:len(Returns),
#{num_hi := MsgsHi,
num_lo := MsgsLo} = rabbit_fifo_q:overview(Messages),
num_no := MsgsNo} = rabbit_fifo_q:overview(Messages),

Overview = #{type => ?STATE,
config => Conf,
Expand All @@ -861,7 +861,7 @@ overview(#?STATE{consumers = Cons,
num_enqueuers => maps:size(Enqs),
num_ready_messages => messages_ready(State),
num_ready_messages_high => MsgsHi,
num_ready_messages_low => MsgsLo,
num_ready_messages_normal => MsgsNo,
num_ready_messages_return => MsgsRet,
num_messages => messages_total(State),
num_release_cursors => 0, %% backwards compat
Expand Down Expand Up @@ -2838,10 +2838,10 @@ priority_tag(Msg) ->
P > 4 ->
hi;
_ ->
lo
no
end;
false ->
lo
no
end.


Expand Down
52 changes: 26 additions & 26 deletions deps/rabbit/src/rabbit_fifo_q.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

%% a weighted priority queue with only two priorities

-record(?MODULE, {hi = ?EMPTY :: {list(msg()), list(msg())},
lo = ?EMPTY :: {list(msg()), list(msg())},
-record(?MODULE, {hi = ?EMPTY :: {list(msg()), list(msg())}, %% high
no = ?EMPTY :: {list(msg()), list(msg())}, %% normal
len = 0 :: non_neg_integer(),
dequeue_counter = 0 :: non_neg_integer()}).

Expand All @@ -31,20 +31,20 @@
new() ->
#?MODULE{}.

-spec in(hi | lo, msg(), state()) -> state().
-spec in(hi | no, msg(), state()) -> state().
in(hi, Item, #?MODULE{hi = Hi, len = Len} = State) ->
State#?MODULE{hi = in(Item, Hi),
len = Len + 1};
in(lo, Item, #?MODULE{lo = Lo, len = Len} = State) ->
State#?MODULE{lo = in(Item, Lo),
in(no, Item, #?MODULE{no = No, len = Len} = State) ->
State#?MODULE{no = in(Item, No),
len = Len + 1}.

-spec out(state()) ->
empty | {msg(), state()}.
out(#?MODULE{len = 0}) ->
empty;
out(#?MODULE{hi = Hi0,
lo = Lo0,
no = No0,
len = Len,
dequeue_counter = C0} = State) ->
C = case C0 of
Expand All @@ -58,8 +58,8 @@ out(#?MODULE{hi = Hi0,
{Msg, State#?MODULE{hi = drop(Hi0),
dequeue_counter = C,
len = Len - 1}};
{lo, Msg} ->
{Msg, State#?MODULE{lo = drop(Lo0),
{no, Msg} ->
{Msg, State#?MODULE{no = drop(No0),
dequeue_counter = C,
len = Len - 1}}
end.
Expand All @@ -78,21 +78,21 @@ len(#?MODULE{len = Len}) ->
-spec from_lqueue(lqueue:lqueue(msg())) -> state().
from_lqueue(LQ) ->
lqueue:fold(fun (Item, Acc) ->
in(lo, Item, Acc)
in(no, Item, Acc)
end, new(), LQ).

-spec get_lowest_index(state()) -> undefined | ra:index().
get_lowest_index(#?MODULE{len = 0}) ->
undefined;
get_lowest_index(#?MODULE{hi = Hi, lo = Lo}) ->
get_lowest_index(#?MODULE{hi = Hi, no = No}) ->
case peek(Hi) of
empty ->
?MSG(LoIdx, _) = peek(Lo),
LoIdx;
?MSG(NoIdx, _) = peek(No),
NoIdx;
?MSG(HiIdx, _) ->
case peek(Lo) of
?MSG(LoIdx, _) ->
min(HiIdx, LoIdx);
case peek(No) of
?MSG(NoIdx, _) ->
min(HiIdx, NoIdx);
empty ->
HiIdx
end
Expand All @@ -101,38 +101,38 @@ get_lowest_index(#?MODULE{hi = Hi, lo = Lo}) ->
-spec overview(state()) ->
#{len := non_neg_integer(),
num_hi := non_neg_integer(),
num_lo := non_neg_integer(),
num_no := non_neg_integer(),
lowest_index := ra:index()}.
overview(#?MODULE{len = Len,
hi = {Hi1, Hi2},
lo = _} = State) ->
no = _} = State) ->
%% TODO: this could be very slow with large backlogs,
%% consider keeping a separate counter for hi, lo messages
%% consider keeping a separate counter for 'hi', 'no' messages
NumHi = length(Hi1) + length(Hi2),
#{len => Len,
num_hi => NumHi,
num_lo => Len - NumHi,
num_no => Len - NumHi,
lowest_index => get_lowest_index(State)}.

%% internals

next(#?MODULE{hi = ?NON_EMPTY = Hi,
lo = ?NON_EMPTY = Lo,
no = ?NON_EMPTY = No,
dequeue_counter = ?WEIGHT}) ->
?MSG(HiIdx, _) = HiMsg = peek(Hi),
?MSG(LoIdx, _) = LoMsg = peek(Lo),
?MSG(NoIdx, _) = NoMsg = peek(No),
%% always favour hi priority messages when it is safe to do so,
%% i.e. the index is lower than the next index for the lo queue
case HiIdx < LoIdx of
%% i.e. the index is lower than the next index for the 'no' queue
case HiIdx < NoIdx of
true ->
{hi, HiMsg};
false ->
{lo, LoMsg}
{no, NoMsg}
end;
next(#?MODULE{hi = ?NON_EMPTY = Hi}) ->
{hi, peek(Hi)};
next(#?MODULE{lo = Lo}) ->
{lo, peek(Lo)}.
next(#?MODULE{no = No}) ->
{no, peek(No)}.

%% invariant, if the queue is non empty so is the Out (right) list.
in(X, ?EMPTY) ->
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -559,8 +559,8 @@ handle_tick(QName,
Infos0 = maps:fold(
fun(num_ready_messages_high, V, Acc) ->
[{messages_ready_high, V} | Acc];
(num_ready_messages_low, V, Acc) ->
[{messages_ready_low, V} | Acc];
(num_ready_messages_normal, V, Acc) ->
[{messages_ready_normal, V} | Acc];
(num_ready_messages_return, V, Acc) ->
[{messages_ready_returned, V} | Acc];
(_, _, Acc) ->
Expand Down
14 changes: 7 additions & 7 deletions deps/rabbit/test/rabbit_fifo_q_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ basics(_Config) ->
rabbit_fifo_q:in(P, I, Q)
end, Q0, [
{hi, ?MSG(1)},
{lo, ?MSG(2)},
{no, ?MSG(2)},
{hi, ?MSG(3)},
{lo, ?MSG(4)},
{no, ?MSG(4)},
{hi, ?MSG(5)}
]),
{?MSG(1), Q2} = rabbit_fifo_q:out(Q1),
Expand All @@ -87,7 +87,7 @@ basics(_Config) ->

hi_is_prioritised(_Config) ->
Q0 = rabbit_fifo_q:new(),
%% when `hi' has a lower index than the next lo then it is still
%% when `hi' has a lower index than the next 'no' then it is still
%% prioritied (as this is safe to do).
Q1 = lists:foldl(
fun ({P, I}, Q) ->
Expand All @@ -97,7 +97,7 @@ hi_is_prioritised(_Config) ->
{hi, ?MSG(2)},
{hi, ?MSG(3)},
{hi, ?MSG(4)},
{lo, ?MSG(5)}
{no, ?MSG(5)}
]),
{?MSG(1), Q2} = rabbit_fifo_q:out(Q1),
{?MSG(2), Q3} = rabbit_fifo_q:out(Q2),
Expand All @@ -110,8 +110,8 @@ hi_is_prioritised(_Config) ->
get_lowest_index(_Config) ->
Q0 = rabbit_fifo_q:new(),
Q1 = rabbit_fifo_q:in(hi, ?MSG(1, ?LINE), Q0),
Q2 = rabbit_fifo_q:in(lo, ?MSG(2, ?LINE), Q1),
Q3 = rabbit_fifo_q:in(lo, ?MSG(3, ?LINE), Q2),
Q2 = rabbit_fifo_q:in(no, ?MSG(2, ?LINE), Q1),
Q3 = rabbit_fifo_q:in(no, ?MSG(3, ?LINE), Q2),
{_, Q4} = rabbit_fifo_q:out(Q3),
{_, Q5} = rabbit_fifo_q:out(Q4),
{_, Q6} = rabbit_fifo_q:out(Q5),
Expand All @@ -129,7 +129,7 @@ get_lowest_index(_Config) ->
single_priority_behaves_like_queue(_Config) ->
run_proper(
fun () ->
?FORALL({P, Ops}, {oneof([hi, lo]), op_gen(256)},
?FORALL({P, Ops}, {oneof([hi, no]), op_gen(256)},
queue_prop(P, Ops))
end, [], 25),
ok.
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
<th class="horizontal">Unacked</th>
<% if (is_quorum(queue)) { %>
<th class="horizontal">High priority</th>
<th class="horizontal">Low priority</th>
<th class="horizontal">Normal priority</th>
<th class="horizontal">Returned</th>
<th class="horizontal">Dead-lettered
<span class="help" id="queue-dead-lettered"></span>
Expand Down Expand Up @@ -163,7 +163,7 @@
<%= fmt_num_thousands(queue.messages_ready_high) %>
</td>
<td class="r">
<%= fmt_num_thousands(queue.messages_ready_low) %>
<%= fmt_num_thousands(queue.messages_ready_normal) %>
</td>
<td class="r">
<%= fmt_num_thousands(queue.messages_ready_returned) %>
Expand Down

0 comments on commit 4a3fa4d

Please sign in to comment.