From ec6800989ade9ce3251f82f7af25a1f0cd05f4db Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Wed, 5 Oct 2022 18:11:28 +0200 Subject: [PATCH] Link JMS receive span with the producer span (#6804) Resolves #6779 In JMS you can have either the consumer receive span or the consumer process span (unlike Kafka, where the process span is always there and the receive span is just an addition) - in scenarios where polling (receive) is used, I think it makes sense to add links to the producer span to preserve the producer-consumer connection. Current messaging semantic conventions don't really describe a situation like this one, but the https://github.com/open-telemetry/oteps/pull/220 OTEP mentions that links might be used in a scenario like this one - which makes me think that adding links here might be a not that bad idea. --- .../src/jms2Test/groovy/Jms2Test.groovy | 88 +++++++++---- .../JmsMessageConsumerInstrumentation.java | 11 +- .../instrumentation/jms/JmsSingletons.java | 5 + .../javaagent/src/test/groovy/Jms1Test.groovy | 117 +++++++++++------- 4 files changed, 150 insertions(+), 71 deletions(-) diff --git a/instrumentation/jms-1.1/javaagent/src/jms2Test/groovy/Jms2Test.groovy b/instrumentation/jms-1.1/javaagent/src/jms2Test/groovy/Jms2Test.groovy index c252901af95c..c2850647fe7f 100644 --- a/instrumentation/jms-1.1/javaagent/src/jms2Test/groovy/Jms2Test.groovy +++ b/instrumentation/jms-1.1/javaagent/src/jms2Test/groovy/Jms2Test.groovy @@ -3,7 +3,6 @@ * SPDX-License-Identifier: Apache-2.0 */ -import com.google.common.io.Files import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification import io.opentelemetry.instrumentation.test.asserts.TraceAssert import io.opentelemetry.sdk.trace.data.SpanData @@ -26,6 +25,7 @@ import javax.jms.Message import javax.jms.MessageListener import javax.jms.Session import javax.jms.TextMessage +import java.nio.file.Files import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicReference @@ -43,7 +43,7 @@ class Jms2Test extends AgentInstrumentationSpecification { HornetQTextMessage message = session.createTextMessage(messageText) def setupSpec() { - def tempDir = Files.createTempDir() + def tempDir = Files.createTempDirectory("jmsTempDir").toFile() tempDir.deleteOnExit() Configuration config = new ConfigurationImpl() @@ -86,19 +86,34 @@ class Jms2Test extends AgentInstrumentationSpecification { def producer = session.createProducer(destination) def consumer = session.createConsumer(destination) - producer.send(message) + runWithSpan("producer parent") { + producer.send(message) + } - TextMessage receivedMessage = consumer.receive() + TextMessage receivedMessage = runWithSpan("consumer parent") { + return consumer.receive() as TextMessage + } String messageId = receivedMessage.getJMSMessageID() expect: receivedMessage.text == messageText assertTraces(2) { - trace(0, 1) { - producerSpan(it, 0, destinationType, destinationName) + SpanData producerSpanData + trace(0, 2) { + span(0) { + name "producer parent" + hasNoParent() + } + producerSpan(it, 1, destinationType, destinationName, span(0)) + + producerSpanData = span(1) } - trace(1, 1) { - consumerSpan(it, 0, destinationType, destinationName, messageId, null, "receive") + trace(1, 2) { + span(0) { + name "consumer parent" + hasNoParent() + } + consumerSpan(it, 1, destinationType, destinationName, messageId, "receive", span(0), producerSpanData) } } @@ -124,18 +139,24 @@ class Jms2Test extends AgentInstrumentationSpecification { @Override void onMessage(Message message) { lock.await() // ensure the producer trace is reported first. - messageRef.set(message) + messageRef.set(message as TextMessage) } } - producer.send(message) + runWithSpan("parent") { + producer.send(message) + } lock.countDown() expect: assertTraces(1) { - trace(0, 2) { - producerSpan(it, 0, destinationType, destinationName) - consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process") + trace(0, 3) { + span(0) { + name "parent" + hasNoParent() + } + producerSpan(it, 1, destinationType, destinationName, span(0)) + consumerSpan(it, 2, destinationType, destinationName, messageRef.get().getJMSMessageID(), "process", span(1)) } } // This check needs to go after all traces have been accounted for @@ -158,7 +179,7 @@ class Jms2Test extends AgentInstrumentationSpecification { def consumer = session.createConsumer(destination) // Receive with timeout - TextMessage receivedMessage = consumer.receiveNoWait() + Message receivedMessage = consumer.receiveNoWait() expect: receivedMessage == null @@ -179,7 +200,7 @@ class Jms2Test extends AgentInstrumentationSpecification { def consumer = session.createConsumer(destination) // Receive with timeout - TextMessage receivedMessage = consumer.receive(100) + Message receivedMessage = consumer.receive(100) expect: receivedMessage == null @@ -206,19 +227,25 @@ class Jms2Test extends AgentInstrumentationSpecification { @Override void onMessage(Message message) { lock.await() // ensure the producer trace is reported first. - messageRef.set(message) + messageRef.set(message as TextMessage) } } when: - producer.send(destination, message) + runWithSpan("parent") { + producer.send(destination, message) + } lock.countDown() then: assertTraces(1) { - trace(0, 2) { - producerSpan(it, 0, destinationType, destinationName) - consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process") + trace(0, 3) { + span(0) { + name "parent" + hasNoParent() + } + producerSpan(it, 1, destinationType, destinationName, span(0)) + consumerSpan(it, 2, destinationType, destinationName, messageRef.get().getJMSMessageID(), "process", span(1)) } } // This check needs to go after all traces have been accounted for @@ -236,11 +263,15 @@ class Jms2Test extends AgentInstrumentationSpecification { session.createTemporaryTopic() | "topic" | "(temporary)" } - static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) { + static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName, SpanData parentSpan = null) { trace.span(index) { name destinationName + " send" kind PRODUCER - hasNoParent() + if (parentSpan == null) { + hasNoParent() + } else { + childOf(parentSpan) + } attributes { "$SemanticAttributes.MESSAGING_SYSTEM" "jms" "$SemanticAttributes.MESSAGING_DESTINATION" destinationName @@ -256,14 +287,19 @@ class Jms2Test extends AgentInstrumentationSpecification { // passing messageId = null will verify message.id is not captured, // passing messageId = "" will verify message.id is captured (but won't verify anything about the value), // any other value for messageId will verify that message.id is captured and has that same value - static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, Object parentOrLinkedSpan, String operation) { + static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, String operation, SpanData parentSpan, SpanData linkedSpan = null) { trace.span(index) { name destinationName + " " + operation kind CONSUMER - if (parentOrLinkedSpan != null) { - childOf((SpanData) parentOrLinkedSpan) - } else { + if (parentSpan == null) { hasNoParent() + } else { + childOf(parentSpan) + } + if (linkedSpan == null) { + hasNoLinks() + } else { + hasLink(linkedSpan) } attributes { "$SemanticAttributes.MESSAGING_SYSTEM" "jms" diff --git a/instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsMessageConsumerInstrumentation.java b/instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsMessageConsumerInstrumentation.java index c204ee6be7e5..337b872d9b8e 100644 --- a/instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsMessageConsumerInstrumentation.java +++ b/instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsMessageConsumerInstrumentation.java @@ -10,6 +10,7 @@ import static io.opentelemetry.javaagent.instrumentation.jms.JmsSingletons.consumerInstrumenter; import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.returns; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import io.opentelemetry.context.Context; @@ -37,10 +38,16 @@ public ElementMatcher typeMatcher() { @Override public void transform(TypeTransformer transformer) { transformer.applyAdviceToMethod( - named("receive").and(takesArguments(0).or(takesArguments(1))).and(isPublic()), + named("receive") + .and(takesArguments(0).or(takesArguments(1))) + .and(returns(named("javax.jms.Message"))) + .and(isPublic()), JmsMessageConsumerInstrumentation.class.getName() + "$ConsumerAdvice"); transformer.applyAdviceToMethod( - named("receiveNoWait").and(takesArguments(0)).and(isPublic()), + named("receiveNoWait") + .and(takesArguments(0)) + .and(returns(named("javax.jms.Message"))) + .and(isPublic()), JmsMessageConsumerInstrumentation.class.getName() + "$ConsumerAdvice"); } diff --git a/instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsSingletons.java b/instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsSingletons.java index be32341f0d4f..6cd1c0803e22 100644 --- a/instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsSingletons.java +++ b/instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsSingletons.java @@ -12,6 +12,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; +import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor; import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; public final class JmsSingletons { @@ -47,6 +48,10 @@ private static Instrumenter buildConsumerInstrumen MessagingSpanNameExtractor.create(getter, operation)) .addAttributesExtractor(buildMessagingAttributesExtractor(getter, operation)) .setEnabled(ExperimentalConfig.get().messagingReceiveInstrumentationEnabled()) + .addSpanLinksExtractor( + new PropagatorBasedSpanLinksExtractor<>( + GlobalOpenTelemetry.getPropagators().getTextMapPropagator(), + MessagePropertyGetter.INSTANCE)) .buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } diff --git a/instrumentation/jms-1.1/javaagent/src/test/groovy/Jms1Test.groovy b/instrumentation/jms-1.1/javaagent/src/test/groovy/Jms1Test.groovy index 7df59e550983..d181f815626e 100644 --- a/instrumentation/jms-1.1/javaagent/src/test/groovy/Jms1Test.groovy +++ b/instrumentation/jms-1.1/javaagent/src/test/groovy/Jms1Test.groovy @@ -61,19 +61,34 @@ class Jms1Test extends AgentInstrumentationSpecification { def producer = session.createProducer(destination) def consumer = session.createConsumer(destination) - producer.send(message) + runWithSpan("producer parent") { + producer.send(message) + } - TextMessage receivedMessage = consumer.receive() + TextMessage receivedMessage = runWithSpan("consumer parent") { + return consumer.receive() as TextMessage + } String messageId = receivedMessage.getJMSMessageID() expect: receivedMessage.text == messageText assertTraces(2) { - trace(0, 1) { - producerSpan(it, 0, destinationType, destinationName) + SpanData producerSpanData + trace(0, 2) { + span(0) { + name "producer parent" + hasNoParent() + } + producerSpan(it, 1, destinationType, destinationName, span(0)) + + producerSpanData = span(1) } - trace(1, 1) { - consumerSpan(it, 0, destinationType, destinationName, messageId, null, "receive") + trace(1, 2) { + span(0) { + name "consumer parent" + hasNoParent() + } + consumerSpan(it, 1, destinationType, destinationName, messageId, "receive", span(0), producerSpanData) } } @@ -99,7 +114,7 @@ class Jms1Test extends AgentInstrumentationSpecification { @Override void onMessage(Message message) { lock.await() // ensure the producer trace is reported first. - messageRef.set(message) + messageRef.set(message as TextMessage) } } @@ -110,7 +125,7 @@ class Jms1Test extends AgentInstrumentationSpecification { assertTraces(1) { trace(0, 2) { producerSpan(it, 0, destinationType, destinationName) - consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process") + consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), "process", span(0)) } } // This check needs to go after all traces have been accounted for @@ -133,7 +148,7 @@ class Jms1Test extends AgentInstrumentationSpecification { def consumer = session.createConsumer(destination) // Receive with timeout - TextMessage receivedMessage = consumer.receiveNoWait() + Message receivedMessage = consumer.receiveNoWait() expect: receivedMessage == null @@ -154,7 +169,7 @@ class Jms1Test extends AgentInstrumentationSpecification { def consumer = session.createConsumer(destination) // Receive with timeout - TextMessage receivedMessage = consumer.receive(100) + Message receivedMessage = consumer.receive(100) expect: receivedMessage == null @@ -183,7 +198,7 @@ class Jms1Test extends AgentInstrumentationSpecification { and: producer.send(message) - TextMessage receivedMessage = consumer.receive() + TextMessage receivedMessage = consumer.receive() as TextMessage then: receivedMessage.text == messageText @@ -196,21 +211,7 @@ class Jms1Test extends AgentInstrumentationSpecification { producerSpan(it, 0, destinationType, destinationName) } trace(1, 1) { - span(0) { - hasNoParent() - name destinationName + " receive" - kind CONSUMER - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "jms" - "$SemanticAttributes.MESSAGING_DESTINATION" destinationName - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" destinationType - "$SemanticAttributes.MESSAGING_MESSAGE_ID" receivedMessage.getJMSMessageID() - "$SemanticAttributes.MESSAGING_OPERATION" "receive" - if (destinationName == "(temporary)") { - "$SemanticAttributes.MESSAGING_TEMP_DESTINATION" true - } - } - } + consumerSpan(it, 0, destinationType, destinationName, "", "receive", null) } } @@ -237,19 +238,25 @@ class Jms1Test extends AgentInstrumentationSpecification { @Override void onMessage(Message message) { lock.await() // ensure the producer trace is reported first. - messageRef.set(message) + messageRef.set(message as TextMessage) } } when: - producer.send(destination, message) + runWithSpan("parent") { + producer.send(destination, message) + } lock.countDown() then: assertTraces(1) { - trace(0, 2) { - producerSpan(it, 0, destinationType, destinationName) - consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process") + trace(0, 3) { + span(0) { + name "parent" + hasNoParent() + } + producerSpan(it, 1, destinationType, destinationName, span(0)) + consumerSpan(it, 2, destinationType, destinationName, messageRef.get().getJMSMessageID(), "process", span(1)) } } // This check needs to go after all traces have been accounted for @@ -278,19 +285,34 @@ class Jms1Test extends AgentInstrumentationSpecification { def message = session.createTextMessage(messageText) message.setStringProperty("test-message-header", "test") message.setIntProperty("test-message-int-header", 1234) - producer.send(message) + runWithSpan("producer parent") { + producer.send(message) + } - TextMessage receivedMessage = consumer.receive() + TextMessage receivedMessage = runWithSpan("consumer parent") { + return consumer.receive() as TextMessage + } String messageId = receivedMessage.getJMSMessageID() expect: receivedMessage.text == messageText assertTraces(2) { - trace(0, 1) { - producerSpan(it, 0, destinationType, destinationName, true) + SpanData producerSpanData + trace(0, 2) { + span(0) { + name "producer parent" + hasNoParent() + } + producerSpan(it, 1, destinationType, destinationName, span(0), true) + + producerSpanData = span(1) } - trace(1, 1) { - consumerSpan(it, 0, destinationType, destinationName, messageId, null, "receive", true) + trace(1, 2) { + span(0) { + name "consumer parent" + hasNoParent() + } + consumerSpan(it, 1, destinationType, destinationName, messageId, "receive", span(0), producerSpanData, true) } } @@ -299,11 +321,15 @@ class Jms1Test extends AgentInstrumentationSpecification { consumer.close() } - static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName, boolean testHeaders = false) { + static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName, SpanData parentSpan = null, boolean testHeaders = false) { trace.span(index) { name destinationName + " send" kind PRODUCER - hasNoParent() + if (parentSpan == null) { + hasNoParent() + } else { + childOf(parentSpan) + } attributes { "$SemanticAttributes.MESSAGING_SYSTEM" "jms" "$SemanticAttributes.MESSAGING_DESTINATION" destinationName @@ -323,14 +349,19 @@ class Jms1Test extends AgentInstrumentationSpecification { // passing messageId = null will verify message.id is not captured, // passing messageId = "" will verify message.id is captured (but won't verify anything about the value), // any other value for messageId will verify that message.id is captured and has that same value - static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, Object parentOrLinkedSpan, String operation, boolean testHeaders = false) { + static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, String operation, SpanData parentSpan, SpanData linkedSpan = null, boolean testHeaders = false) { trace.span(index) { name destinationName + " " + operation kind CONSUMER - if (parentOrLinkedSpan != null) { - childOf((SpanData) parentOrLinkedSpan) - } else { + if (parentSpan == null) { hasNoParent() + } else { + childOf(parentSpan) + } + if (linkedSpan == null) { + hasNoLinks() + } else { + hasLink(linkedSpan) } attributes { "$SemanticAttributes.MESSAGING_SYSTEM" "jms"