From ac74116d766b58f374d2d038997a54f706c5b641 Mon Sep 17 00:00:00 2001 From: Brian Clozel Date: Tue, 9 Apr 2024 18:34:09 +0200 Subject: [PATCH] Instrument JmsTemplate#sendAndReceive Prior to this commit, the `JmsTemplate#sendAndReceive` method would not instrument the JMS session. This means that no metric would be recorded when sending the message and no trace would be propagated downstream. This commit ensures that the JMS session is instrumented in this case as well. Note, the reception of the response message does not create a `"jms.message.process"` observation as the session is only receiving the message, no listener has been configured on the message consumer. Fixes gh-32606 --- .../springframework/jms/core/JmsTemplate.java | 5 +- .../jms/core/JmsTemplateObservationTests.java | 55 ++++++++++++++++++- 2 files changed, 58 insertions(+), 2 deletions(-) diff --git a/spring-jms/src/main/java/org/springframework/jms/core/JmsTemplate.java b/spring-jms/src/main/java/org/springframework/jms/core/JmsTemplate.java index 007f0a3c0d1f..ecae541c4e31 100644 --- a/spring-jms/src/main/java/org/springframework/jms/core/JmsTemplate.java +++ b/spring-jms/src/main/java/org/springframework/jms/core/JmsTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -971,6 +971,9 @@ private T executeLocal(SessionCallback action, boolean startConnection) t try { con = createConnection(); session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + if (micrometerJakartaPresent && this.observationRegistry != null) { + session = MicrometerInstrumentation.instrumentSession(session, this.observationRegistry); + } if (startConnection) { con.start(); } diff --git a/spring-jms/src/test/java/org/springframework/jms/core/JmsTemplateObservationTests.java b/spring-jms/src/test/java/org/springframework/jms/core/JmsTemplateObservationTests.java index b4fa023d3bf6..99ae2fe426a1 100644 --- a/spring-jms/src/test/java/org/springframework/jms/core/JmsTemplateObservationTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/core/JmsTemplateObservationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,9 +20,14 @@ import java.util.concurrent.TimeUnit; import io.micrometer.observation.tck.TestObservationRegistry; +import jakarta.jms.Destination; +import jakarta.jms.JMSException; +import jakarta.jms.Message; import jakarta.jms.MessageConsumer; +import jakarta.jms.Session; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.junit.EmbeddedActiveMQExtension; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -81,6 +86,54 @@ void shouldRecordJmsProcessObservations() { .hasHighCardinalityKeyValue("messaging.destination.name", "spring.test.observation"); } + @Test + void shouldRecordJmsPublishAndProcessObservations() throws Exception { + JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); + jmsTemplate.setObservationRegistry(registry); + + new Thread(() -> { + jmsTemplate.execute(session -> { + try { + CountDownLatch latch = new CountDownLatch(1); + MessageConsumer mc = session.createConsumer(session.createQueue("spring.test.observation")); + mc.setMessageListener(message -> { + try { + Destination jmsReplyTo = message.getJMSReplyTo(); + jmsTemplate.send(jmsReplyTo, new MessageCreator() { + @Override + public Message createMessage(Session session) throws JMSException { + latch.countDown(); + return session.createTextMessage("response content"); + } + }); + } + catch (JMSException e) { + throw new RuntimeException(e); + } + }); + return latch.await(2, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, true); + + }).start(); + Message response = jmsTemplate.sendAndReceive("spring.test.observation", new MessageCreator() { + @Override + public Message createMessage(Session session) throws JMSException { + return session.createTextMessage("request content"); + } + }); + + String responseBody = response.getBody(String.class); + Assertions.assertThat(responseBody).isEqualTo("response content"); + + assertThat(registry).hasNumberOfObservationsWithNameEqualTo("jms.message.publish", 2); + assertThat(registry).hasObservationWithNameEqualTo("jms.message.process").that() + .hasHighCardinalityKeyValue("messaging.destination.name", "spring.test.observation"); + } + @AfterEach void shutdownServer() { connectionFactory.close();