-
Notifications
You must be signed in to change notification settings - Fork 870
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pulsar: use span links when receive telemetry is enabled #10650
Conversation
@@ -87,24 +99,28 @@ private static Instrumenter<PulsarBatchRequest, Void> createConsumerBatchReceive | |||
createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE)) | |||
.addAttributesExtractor( | |||
ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter())) | |||
.setEnabled(ExperimentalConfig.get().messagingReceiveInstrumentationEnabled()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spring kafka instrumentation also produces telemetry with span links for the batch receive scenario that can't be modeled with just parent child relationships
span -> | ||
span.hasName(topic + " receive") | ||
.hasKind(SpanKind.CONSUMER) | ||
.hasParent(trace.getSpan(1)) | ||
.hasNoParent() | ||
.hasLinks(LinkData.create(producerSpan.get().getSpanContext())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this deviates a bit from kafka, here we place the span link on both receive
and process
spans but kafka places it only on the process
span.
...io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/MessageListenerContext.java
Outdated
Show resolved
Hide resolved
...io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/MessageListenerContext.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me. I had a couple of questions and a few small things I think could help improve readability....but otherwise seems nice a solid change.
.../java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java
Outdated
Show resolved
Hide resolved
.../java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java
Show resolved
Hide resolved
List<AttributeAssertion> assertions = | ||
new ArrayList<>( | ||
Arrays.asList( | ||
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "pulsar"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Static imports for improved readability.
.hasParent(trace.getSpan(1)))); | ||
} | ||
|
||
static List<AttributeAssertion> sendAttributes( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method name doesn't match the behavior. Maybe something more like makeSendAttributeAssertions()
or even sendAttrAssertions()
or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same pattern is used in kafka tests. In my opinion it is fine, these methods are only used in tests and attributes vs attribute assertions isn't a big difference.
return receiveAttributes(destination, messageId, testHeaders, false); | ||
} | ||
|
||
static List<AttributeAssertion> receiveAttributes( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment about the name. Maybe makeReceiveAttrAssertions()
.
...st/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java
Show resolved
Hide resolved
...pentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientSuppressReceiveSpansTest.java
Outdated
Show resolved
Hide resolved
consumer = | ||
client | ||
.newConsumer(Schema.STRING) | ||
.subscriptionName("test_sub") | ||
.topic(topic) | ||
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) | ||
.subscribe(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, where does the "receive with timeout" part factor into this test? I'm not seeing it. If it's there, perhaps a comment can help to clarify, because a reader doesn't immediately see how the receive-with-timeout is wired up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I presume it is consumer.receive(1, TimeUnit.MINUTES);
Resolves #8859
Supersedes #8909
This PR alters spans produced by pulsar instrumentation to behave similarly to kafka instrumentation. If
receive-telemetry
is enabled place producer and consumer spans in different traces that are connected via span links. Ifreceive-telemetry
is not enabled place producer and consumer in the same trace and don't createreceive
span if the trace contains aprocess
span.