Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pulsar: use span links when receive telemetry is enabled #10650

Merged
merged 8 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion instrumentation/pulsar/pulsar-2.8/javaagent/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,28 @@ dependencies {
testImplementation("org.apache.pulsar:pulsar-client-admin:2.8.0")
}

tasks {
val testReceiveSpanDisabled by registering(Test::class) {
filter {
includeTestsMatching("PulsarClientSuppressReceiveSpansTest")
}
include("**/PulsarClientSuppressReceiveSpansTest.*")
}

test {
filter {
excludeTestsMatching("PulsarClientSuppressReceiveSpansTest")
}
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
}

check {
dependsOn(testReceiveSpanDisabled)
}
}

tasks.withType<Test>().configureEach {
// TODO run tests both with and without experimental span attributes
jvmArgs("-Dotel.instrumentation.pulsar.experimental-span-attributes=true")
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.MessageListenerContext;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class ConsumerBaseInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.pulsar.client.impl.ConsumerBase")
.or(named("org.apache.pulsar.client.impl.MultiTopicsConsumerImpl"));
}

@Override
public void transform(TypeTransformer transformer) {
// these methods receive a message and pass it on to a message listener
// we instrument them so that the span for the receive operation could be suppressed
transformer.applyAdviceToMethod(
named("triggerListener").and(takesArguments(0)).or(named("receiveMessageFromConsumer")),
this.getClass().getName() + "$TriggerListenerAdvice");
}

@SuppressWarnings("unused")
public static class TriggerListenerAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter() {
MessageListenerContext.startProcessing();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit() {
MessageListenerContext.endProcessing();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public PulsarInstrumentationModule() {
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return Arrays.asList(
new ConsumerBaseInstrumentation(),
new ConsumerImplInstrumentation(),
new ProducerImplInstrumentation(),
new MessageInstrumentation(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry;

/**
* Helper class used to determine whether message is going to be processed by a listener. If we know
* that message is going to be passed to a message listener, that would produce a span for the
* "process" operation, we are going to suppress the span from the message "receive" operation.
*/
public final class MessageListenerContext {
private static final ThreadLocal<Boolean> processing = new ThreadLocal<>();

private MessageListenerContext() {}

/** Call on entry to a method that will pass the received message to a message listener. */
public static void startProcessing() {
processing.set(Boolean.TRUE);
}

public static void endProcessing() {
processing.remove();
}

/** Returns true if we expect a received message to be passed to a listener. */
public static boolean isProcessing() {
return processing.get() != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor;
import io.opentelemetry.instrumentation.api.internal.Timer;
import io.opentelemetry.instrumentation.api.semconv.network.ServerAttributesExtractor;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
Expand All @@ -38,6 +40,8 @@ public final class PulsarSingletons {
TELEMETRY.getPropagators().getTextMapPropagator();
private static final List<String> capturedHeaders =
ExperimentalConfig.get().getMessagingHeaders();
private static final boolean receiveInstrumentationEnabled =
ExperimentalConfig.get().messagingReceiveInstrumentationEnabled();

private static final Instrumenter<PulsarRequest, Void> CONSUMER_PROCESS_INSTRUMENTER =
createConsumerProcessInstrumenter();
Expand All @@ -64,15 +68,23 @@ private static Instrumenter<PulsarRequest, Void> createConsumerReceiveInstrument
MessagingAttributesGetter<PulsarRequest, Void> getter =
PulsarMessagingAttributesGetter.INSTANCE;

return Instrumenter.<PulsarRequest, Void>builder(
TELEMETRY,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, MessageOperation.RECEIVE))
.addAttributesExtractor(
createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE))
.addAttributesExtractor(
ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter()))
.buildConsumerInstrumenter(MessageTextMapGetter.INSTANCE);
InstrumenterBuilder<PulsarRequest, Void> instrumenterBuilder =
Instrumenter.<PulsarRequest, Void>builder(
TELEMETRY,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, MessageOperation.RECEIVE))
.addAttributesExtractor(
createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE))
.addAttributesExtractor(
ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter()));

if (receiveInstrumentationEnabled) {
return instrumenterBuilder
.addSpanLinksExtractor(
new PropagatorBasedSpanLinksExtractor<>(PROPAGATOR, MessageTextMapGetter.INSTANCE))
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}
return instrumenterBuilder.buildConsumerInstrumenter(MessageTextMapGetter.INSTANCE);
}

private static Instrumenter<PulsarBatchRequest, Void> createConsumerBatchReceiveInstrumenter() {
Expand All @@ -87,24 +99,29 @@ private static Instrumenter<PulsarBatchRequest, Void> createConsumerBatchReceive
createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE))
.addAttributesExtractor(
ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter()))
.setEnabled(ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spring kafka instrumentation also produces telemetry with span links for the batch receive scenario that can't be modeled with just parent child relationships

.addSpanLinksExtractor(
new PulsarBatchRequestSpanLinksExtractor(
GlobalOpenTelemetry.getPropagators().getTextMapPropagator()))
.addSpanLinksExtractor(new PulsarBatchRequestSpanLinksExtractor(PROPAGATOR))
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}

private static Instrumenter<PulsarRequest, Void> createConsumerProcessInstrumenter() {
MessagingAttributesGetter<PulsarRequest, Void> getter =
PulsarMessagingAttributesGetter.INSTANCE;

return Instrumenter.<PulsarRequest, Void>builder(
TELEMETRY,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, MessageOperation.PROCESS))
.addAttributesExtractor(
createMessagingAttributesExtractor(getter, MessageOperation.PROCESS))
.buildInstrumenter();
InstrumenterBuilder<PulsarRequest, Void> instrumenterBuilder =
Instrumenter.<PulsarRequest, Void>builder(
TELEMETRY,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, MessageOperation.PROCESS))
.addAttributesExtractor(
createMessagingAttributesExtractor(getter, MessageOperation.PROCESS));

if (receiveInstrumentationEnabled) {
SpanLinksExtractor<PulsarRequest> spanLinksExtractor =
new PropagatorBasedSpanLinksExtractor<>(PROPAGATOR, MessageTextMapGetter.INSTANCE);
instrumenterBuilder.addSpanLinksExtractor(spanLinksExtractor);
return instrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}
return instrumenterBuilder.buildConsumerInstrumenter(MessageTextMapGetter.INSTANCE);
}

private static Instrumenter<PulsarRequest, Void> createProducerInstrumenter() {
Expand Down Expand Up @@ -146,12 +163,17 @@ public static Context startAndEndConsumerReceive(
if (!CONSUMER_RECEIVE_INSTRUMENTER.shouldStart(parent, request)) {
return null;
}
// startAndEnd not supports extract trace context from carrier
// start not supports custom startTime
// extract trace context by using TEXT_MAP_PROPAGATOR here.
if (!receiveInstrumentationEnabled) {
// suppress receive span when receive telemetry is not enabled and message is going to be
// processed by a listener
if (MessageListenerContext.isProcessing()) {
return null;
}
parent = PROPAGATOR.extract(parent, request, MessageTextMapGetter.INSTANCE);
}
return InstrumenterUtil.startAndEnd(
CONSUMER_RECEIVE_INSTRUMENTER,
PROPAGATOR.extract(parent, request, MessageTextMapGetter.INSTANCE),
parent,
laurit marked this conversation as resolved.
Show resolved Hide resolved
request,
null,
throwable,
Expand Down Expand Up @@ -185,11 +207,17 @@ private static Context startAndEndConsumerReceive(

public static CompletableFuture<Message<?>> wrap(
CompletableFuture<Message<?>> future, Timer timer, Consumer<?> consumer) {
boolean listenerContextActive = MessageListenerContext.isProcessing();
Context parent = Context.current();
CompletableFuture<Message<?>> result = new CompletableFuture<>();
future.whenComplete(
(message, throwable) -> {
Context context = startAndEndConsumerReceive(parent, message, timer, consumer, throwable);
// we create a "receive" span when receive telemetry is enabled or when we know that
// this message will not be passed to a listener that would create the "process" span
Context context =
receiveInstrumentationEnabled || !listenerContextActive
? startAndEndConsumerReceive(parent, message, timer, consumer, throwable)
: parent;
runWithContext(
context,
() -> {
Expand Down
Loading
Loading