Skip to content

Commit

Permalink
Bump ActiveMQ to 5.18.3
Browse files Browse the repository at this point in the history
Remove bean `logQuery` as described in
spring-attic/spring-native#1708 (comment)
to avoid ActiveMQ start up failure with reason
```
java.lang.ClassNotFoundException: io.fabric8.insight.log.log4j.Log4jLogQuery
```
  • Loading branch information
ansd committed Feb 21, 2024
1 parent 26068a0 commit 3187423
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 91 deletions.
2 changes: 1 addition & 1 deletion deps/amqp10_client/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ HEX_TARBALL_FILES += rabbitmq-components.mk \
# ActiveMQ for the testsuite.
# --------------------------------------------------------------------

ACTIVEMQ_VERSION := 5.14.4
ACTIVEMQ_VERSION := 5.18.3
ACTIVEMQ_URL := 'https://archive.apache.org/dist/activemq/$(ACTIVEMQ_VERSION)/apache-activemq-$(ACTIVEMQ_VERSION)-bin.tar.gz'

ACTIVEMQ := $(abspath test/system_SUITE_data/apache-activemq-$(ACTIVEMQ_VERSION)/bin/activemq)
Expand Down
4 changes: 2 additions & 2 deletions deps/amqp10_client/activemq.bzl
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")

ACTIVEMQ_VERSION = "5.14.4"
ACTIVEMQ_VERSION = "5.18.3"
ACTIVEMQ_URL = "https://archive.apache.org/dist/activemq/{version}/apache-activemq-{version}-bin.tar.gz".format(version = ACTIVEMQ_VERSION)
SHA_256 = "16ec52bece0a4759f9d70f4132d7d8da67d662e4af029081c492e65510a695c1"
SHA_256 = "943381aa6d340707de6c42eadbf7b41b7fdf93df604156d972d50c4da783544f"

def activemq_archive():
http_archive(
Expand Down
179 changes: 103 additions & 76 deletions deps/amqp10_client/test/system_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ shared() ->
split_transfer,
transfer_unsettled,
subscribe,
subscribe_with_auto_flow,
subscribe_with_auto_flow_settled,
subscribe_with_auto_flow_unsettled,
outgoing_heartbeat,
roundtrip_large_messages,
transfer_id_vs_delivery_id
Expand Down Expand Up @@ -502,7 +503,7 @@ transfer_unsettled(Config) ->
subscribe(Config) ->
Hostname = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
QueueName = <<"test-sub">>,
QueueName = atom_to_binary(?FUNCTION_NAME),
{ok, Connection} = amqp10_client:open_connection(Hostname, Port),
{ok, Session} = amqp10_client:begin_session(Connection),
{ok, Sender} = amqp10_client:attach_sender_link_sync(Session,
Expand Down Expand Up @@ -530,104 +531,121 @@ subscribe(Config) ->
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection).

subscribe_with_auto_flow(Config) ->
subscribe_with_auto_flow_settled(Config) ->
SenderSettleMode = settled,
Hostname = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
QueueName = <<"test-sub">>,
QueueName = atom_to_binary(?FUNCTION_NAME),
{ok, Connection} = amqp10_client:open_connection(Hostname, Port),
{ok, Session} = amqp10_client:begin_session(Connection),
{ok, Sender} = amqp10_client:attach_sender_link_sync(Session,
<<"sub-sender">>,
QueueName),
await_link(Sender, credited, link_credit_timeout),

_ = publish_messages(Sender, <<"banana">>, 20),
%% Use sender settle mode 'settled'.
{ok, R1} = amqp10_client:attach_receiver_link(
Session, <<"sub-receiver-1">>, QueueName, settled),
await_link(R1, attached, attached_timeout),
ok = amqp10_client:flow_link_credit(R1, 5, 2),
?assertEqual(20, count_received_messages(R1)),
ok = amqp10_client:detach_link(R1),
publish_messages(Sender, <<"banana">>, 20),
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session, <<"sub-receiver">>, QueueName, SenderSettleMode),
await_link(Receiver, attached, attached_timeout),

ok = amqp10_client:flow_link_credit(Receiver, 5, 2),
?assertEqual(20, count_received_messages(Receiver)),

_ = publish_messages(Sender, <<"banana">>, 30),
ok = amqp10_client:detach_link(Receiver),
ok = amqp10_client:detach_link(Sender),
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection).

subscribe_with_auto_flow_unsettled(Config) ->
SenderSettleMode = unsettled,
Hostname = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
QueueName = atom_to_binary(?FUNCTION_NAME),
{ok, Connection} = amqp10_client:open_connection(Hostname, Port),
{ok, Session} = amqp10_client:begin_session(Connection),
{ok, Sender} = amqp10_client:attach_sender_link_sync(Session,
<<"sub-sender">>,
QueueName),
await_link(Sender, credited, link_credit_timeout),

_ = publish_messages(Sender, <<"1-">>, 30),
%% Use sender settle mode 'unsettled'.
%% This should require us to manually settle message in order to receive more messages.
{ok, R2} = amqp10_client:attach_receiver_link(Session, <<"sub-receiver-2">>, QueueName, unsettled),
await_link(R2, attached, attached_timeout),
ok = amqp10_client:flow_link_credit(R2, 5, 2),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"sub-receiver-2">>, QueueName, SenderSettleMode),
await_link(Receiver, attached, attached_timeout),
ok = amqp10_client:flow_link_credit(Receiver, 5, 2),
%% We should receive exactly 5 messages.
[M1, _M2, M3, M4, M5] = receive_messages(R2, 5),
ok = assert_no_message(R2),
[M1, _M2, M3, M4, M5] = receive_messages(Receiver, 5),
ok = assert_no_message(Receiver),

%% Even when we accept the first 3 messages, the number of unsettled messages has not yet fallen below 2.
%% Therefore, the client should not yet grant more credits to the sender.
ok = amqp10_client_session:disposition(
R2, amqp10_msg:delivery_id(M1), amqp10_msg:delivery_id(M3), true, accepted),
ok = assert_no_message(R2),
Receiver, amqp10_msg:delivery_id(M1), amqp10_msg:delivery_id(M3), true, accepted),
ok = assert_no_message(Receiver),

%% When we accept 1 more message (the order in which we accept shouldn't matter, here we accept M5 before M4),
%% the number of unsettled messages now falls below 2 (since only M4 is left unsettled).
%% Therefore, the client should grant 5 credits to the sender.
%% Therefore, we should receive 5 more messages.
ok = amqp10_client:accept_msg(R2, M5),
[_M6, _M7, _M8, _M9, M10] = receive_messages(R2, 5),
ok = assert_no_message(R2),
ok = amqp10_client:accept_msg(Receiver, M5),
[_M6, _M7, _M8, _M9, M10] = receive_messages(Receiver, 5),
ok = assert_no_message(Receiver),

%% It shouldn't matter how we settle messages, therefore we use 'rejected' this time.
%% Settling all in flight messages should cause us to receive exactly 5 more messages.
ok = amqp10_client_session:disposition(
R2, amqp10_msg:delivery_id(M4), amqp10_msg:delivery_id(M10), true, rejected),
[M11, _M12, _M13, _M14, M15] = receive_messages(R2, 5),
ok = assert_no_message(R2),
Receiver, amqp10_msg:delivery_id(M4), amqp10_msg:delivery_id(M10), true, rejected),
[M11, _M12, _M13, _M14, M15] = receive_messages(Receiver, 5),
ok = assert_no_message(Receiver),

%% Dynamically decrease link credit.
%% Since we explicitly tell to grant 3 new credits now, we expect to receive 3 more messages.
ok = amqp10_client:flow_link_credit(R2, 3, 3),
[M16, _M17, M18] = receive_messages(R2, 3),
ok = assert_no_message(R2),
ok = amqp10_client:flow_link_credit(Receiver, 3, 3),
[M16, _M17, M18] = receive_messages(Receiver, 3),
ok = assert_no_message(Receiver),

ok = amqp10_client_session:disposition(
R2, amqp10_msg:delivery_id(M11), amqp10_msg:delivery_id(M15), true, accepted),
Receiver, amqp10_msg:delivery_id(M11), amqp10_msg:delivery_id(M15), true, accepted),
%% However, the RenewWhenBelow=3 still refers to all unsettled messages.
%% Right now we have 3 messages (M16, M17, M18) unsettled.
ok = assert_no_message(R2),
ok = assert_no_message(Receiver),

%% Settling 1 out of these 3 messages causes RenewWhenBelow to fall below 3 resulting
%% in 3 new messages to be received.
ok = amqp10_client:accept_msg(R2, M18),
[_M19, _M20, _M21] = receive_messages(R2, 3),
ok = assert_no_message(R2),
ok = amqp10_client:accept_msg(Receiver, M18),
[_M19, _M20, _M21] = receive_messages(Receiver, 3),
ok = assert_no_message(Receiver),

ok = amqp10_client:flow_link_credit(R2, 3, never, true),
[_M22, _M23, M24] = receive_messages(R2, 3),
ok = assert_no_message(R2),
ok = amqp10_client:flow_link_credit(Receiver, 3, never, true),
[_M22, _M23, M24] = receive_messages(Receiver, 3),
ok = assert_no_message(Receiver),

%% Since RenewWhenBelow = never, we expect to receive no new messages despite settling.
ok = amqp10_client_session:disposition(
R2, amqp10_msg:delivery_id(M16), amqp10_msg:delivery_id(M24), true, rejected),
ok = assert_no_message(R2),
Receiver, amqp10_msg:delivery_id(M16), amqp10_msg:delivery_id(M24), true, rejected),
ok = assert_no_message(Receiver),

ok = amqp10_client:flow_link_credit(R2, 2, never, false),
[M25, _M26] = receive_messages(R2, 2),
ok = assert_no_message(R2),
ok = amqp10_client:flow_link_credit(Receiver, 2, never, false),
[M25, _M26] = receive_messages(Receiver, 2),
ok = assert_no_message(Receiver),

ok = amqp10_client:flow_link_credit(R2, 3, 3),
[_M27, _M28, M29] = receive_messages(R2, 3),
ok = assert_no_message(R2),
ok = amqp10_client:flow_link_credit(Receiver, 3, 3),
[_M27, _M28, M29] = receive_messages(Receiver, 3),
ok = assert_no_message(Receiver),

ok = amqp10_client_session:disposition(
R2, amqp10_msg:delivery_id(M25), amqp10_msg:delivery_id(M29), true, accepted),
[M30] = receive_messages(R2, 1),
ok = assert_no_message(R2),
ok = amqp10_client:accept_msg(R2, M30),
Receiver, amqp10_msg:delivery_id(M25), amqp10_msg:delivery_id(M29), true, accepted),
[M30] = receive_messages(Receiver, 1),
ok = assert_no_message(Receiver),
ok = amqp10_client:accept_msg(Receiver, M30),
%% The sender queue is empty now.
ok = assert_no_message(R2),
ok = assert_no_message(Receiver),

ok = amqp10_client:flow_link_credit(R2, 3, 1),
_ = publish_messages(Sender, <<"banana">>, 1),
[M31] = receive_messages(R2, 1),
ok = amqp10_client:accept_msg(R2, M31),
ok = amqp10_client:flow_link_credit(Receiver, 3, 1),
_ = publish_messages(Sender, <<"2-">>, 1),
[M31] = receive_messages(Receiver, 1),
ok = amqp10_client:accept_msg(Receiver, M31),

%% Since function flow_link_credit/3 documents
%% "if RenewWhenBelow is an integer, the amqp10_client will automatically grant more
Expand All @@ -637,24 +655,25 @@ subscribe_with_auto_flow(Config) ->
%% remaining link credit (2) and unsettled messages (0) is 2.
%%
%% Therefore, when we publish another 3 messages, we expect to only receive only 2 messages!
_ = publish_messages(Sender, <<"banana">>, 5),
[M32, M33] = receive_messages(R2, 2),
ok = assert_no_message(R2),
_ = publish_messages(Sender, <<"3-">>, 5),
[M32, M33] = receive_messages(Receiver, 2),
ok = assert_no_message(Receiver),

%% When we accept both messages, the sum of the remaining link credit (0) and unsettled messages (0)
%% falls below RenewWhenBelow=1 causing the amqp10_client to grant 3 new credits.
ok = amqp10_client:accept_msg(R2, M32),
ok = assert_no_message(R2),
ok = amqp10_client:accept_msg(R2, M33),

[M35, M36, M37] = receive_messages(R2, 3),
ok = amqp10_client:accept_msg(R2, M35),
ok = amqp10_client:accept_msg(R2, M36),
ok = amqp10_client:accept_msg(R2, M37),
ok = amqp10_client:accept_msg(Receiver, M32),
ok = assert_no_message(Receiver),
ok = amqp10_client:accept_msg(Receiver, M33),

[M35, M36, M37] = receive_messages(Receiver, 3),
ok = amqp10_client:accept_msg(Receiver, M35),
ok = amqp10_client:accept_msg(Receiver, M36),
ok = amqp10_client:accept_msg(Receiver, M37),
%% The sender queue is empty now.
ok = assert_no_message(R2),
ok = assert_no_message(Receiver),

ok = amqp10_client:detach_link(R2),
ok = amqp10_client:detach_link(Receiver),
ok = amqp10_client:detach_link(Sender),
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection).

Expand Down Expand Up @@ -817,18 +836,18 @@ await_link(Who, What, Err) ->
ok;
{amqp10_event, {link, Who0, {detached, Why}}}
when Who0 =:= Who ->
exit(Why)
ct:fail(Why)
after 5000 ->
flush(),
exit(Err)
ct:fail(Err)
end.

publish_messages(Sender, Data, Num) ->
publish_messages(Sender, BodyPrefix, Num) ->
[begin
Tag = integer_to_binary(T),
Msg = amqp10_msg:new(Tag, Data, false),
ok = amqp10_client:send_msg(Sender, Msg),
ok = await_disposition(Tag)
Tag = integer_to_binary(T),
Msg = amqp10_msg:new(Tag, <<BodyPrefix/binary, Tag/binary>>, false),
ok = amqp10_client:send_msg(Sender, Msg),
ok = await_disposition(Tag)
end || T <- lists:seq(1, Num)].

await_disposition(DeliveryTag) ->
Expand All @@ -847,7 +866,7 @@ count_received_messages0(Receiver, Count) ->
receive
{amqp10_msg, Receiver, _Msg} ->
count_received_messages0(Receiver, Count + 1)
after 200 ->
after 500 ->
Count
end.

Expand All @@ -861,7 +880,15 @@ receive_messages0(Receiver, N, Acc) ->
{amqp10_msg, Receiver, Msg} ->
receive_messages0(Receiver, N - 1, [Msg | Acc])
after 5000 ->
ct:fail({timeout, {num_received, length(Acc)}, {num_missing, N}})
LastReceivedMsg = case Acc of
[] -> none;
[M | _] -> M
end,
ct:fail({timeout,
{num_received, length(Acc)},
{num_missing, N},
{last_received_msg, LastReceivedMsg}
})
end.

assert_no_message(Receiver) ->
Expand Down
6 changes: 0 additions & 6 deletions deps/amqp10_client/test/system_SUITE_data/conf/activemq.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,6 @@
</property>
</bean>

<!-- Allows accessing the server log -->
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>

<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,6 @@
</property>
</bean>

<!-- Allows accessing the server log -->
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>

<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
Expand Down

0 comments on commit 3187423

Please sign in to comment.