Skip to content

Commit

Permalink
Instrument JmsTemplate#sendAndReceive
Browse files Browse the repository at this point in the history
Prior to this commit, the `JmsTemplate#sendAndReceive` method would not
instrument the JMS session. This means that no metric would be recorded
when sending the message and no trace would be propagated downstream.

This commit ensures that the JMS session is instrumented in this case as
well. Note, the reception of the response message does not create a
`"jms.message.process"` observation as the session is only receiving the
message, no listener has been configured on the message consumer.

Fixes gh-32606
  • Loading branch information
bclozel committed Apr 9, 2024
1 parent c5590ae commit ac74116
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -971,6 +971,9 @@ private <T> T executeLocal(SessionCallback<T> action, boolean startConnection) t
try {
con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
if (micrometerJakartaPresent && this.observationRegistry != null) {
session = MicrometerInstrumentation.instrumentSession(session, this.observationRegistry);
}
if (startConnection) {
con.start();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,9 +20,14 @@
import java.util.concurrent.TimeUnit;

import io.micrometer.observation.tck.TestObservationRegistry;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Session;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.junit.EmbeddedActiveMQExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -81,6 +86,54 @@ void shouldRecordJmsProcessObservations() {
.hasHighCardinalityKeyValue("messaging.destination.name", "spring.test.observation");
}

@Test
void shouldRecordJmsPublishAndProcessObservations() throws Exception {
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setObservationRegistry(registry);

new Thread(() -> {
jmsTemplate.execute(session -> {
try {
CountDownLatch latch = new CountDownLatch(1);
MessageConsumer mc = session.createConsumer(session.createQueue("spring.test.observation"));
mc.setMessageListener(message -> {
try {
Destination jmsReplyTo = message.getJMSReplyTo();
jmsTemplate.send(jmsReplyTo, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
latch.countDown();
return session.createTextMessage("response content");
}
});
}
catch (JMSException e) {
throw new RuntimeException(e);
}
});
return latch.await(2, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, true);

}).start();
Message response = jmsTemplate.sendAndReceive("spring.test.observation", new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("request content");
}
});

String responseBody = response.getBody(String.class);
Assertions.assertThat(responseBody).isEqualTo("response content");

assertThat(registry).hasNumberOfObservationsWithNameEqualTo("jms.message.publish", 2);
assertThat(registry).hasObservationWithNameEqualTo("jms.message.process").that()
.hasHighCardinalityKeyValue("messaging.destination.name", "spring.test.observation");
}

@AfterEach
void shutdownServer() {
connectionFactory.close();
Expand Down

0 comments on commit ac74116

Please sign in to comment.