Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use OpenTelemetry Instrumenter #1678

Merged
merged 2 commits into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ public static TracingMetadata withCurrent(Context currentContext) {
return EMPTY;
}

public static TracingMetadata with(Context currentSpanContext, Context previousSpanContext) {
if (currentSpanContext != null || previousSpanContext != null) {
return new TracingMetadata(currentSpanContext, previousSpanContext);
}
return EMPTY;
}

@Deprecated
public TracingMetadata withSpan(Span span) {
if (span != null) {
return new TracingMetadata(Context.root().with(span), previousSpanContext);
Expand Down
26 changes: 7 additions & 19 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
<module>api</module>
<module>smallrye-reactive-messaging-provider</module>
<module>smallrye-reactive-messaging-in-memory</module>
<module>smallrye-reactive-messaging-otel</module>
<module>smallrye-reactive-messaging-kafka</module>
<module>smallrye-reactive-messaging-kafka-api</module>
<module>smallrye-reactive-messaging-kafka-test-companion</module>
Expand Down Expand Up @@ -264,26 +265,13 @@
<version>${jboss-log-manager.version}</version>
</dependency>

<!-- This BOM includes the opentelemetry-bom and the opentelemetry-bom-alpha -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>${opentelemetry-semver.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<version>${opentelemetry.version}</version>
<scope>test</scope>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-bom-alpha</artifactId>
<version>${opentelemetry.version}-alpha</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<!-- From this project -->
Expand Down
14 changes: 5 additions & 9 deletions smallrye-reactive-messaging-amqp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,13 @@
<version>${smallrye-vertx-mutiny-clients.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-amqp-client</artifactId>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-otel</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<groupId>io.vertx</groupId>
<artifactId>vertx-amqp-client</artifactId>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,29 @@
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import io.smallrye.reactive.messaging.TracingMetadata;
import io.smallrye.reactive.messaging.amqp.fault.AmqpAccept;
import io.smallrye.reactive.messaging.amqp.fault.AmqpFailStop;
import io.smallrye.reactive.messaging.amqp.fault.AmqpFailureHandler;
import io.smallrye.reactive.messaging.amqp.fault.AmqpModifiedFailed;
import io.smallrye.reactive.messaging.amqp.fault.AmqpModifiedFailedAndUndeliverableHere;
import io.smallrye.reactive.messaging.amqp.fault.AmqpReject;
import io.smallrye.reactive.messaging.amqp.fault.AmqpRelease;
import io.smallrye.reactive.messaging.amqp.tracing.AmqpAttributesExtractor;
import io.smallrye.reactive.messaging.amqp.tracing.AmqpMessageTextMapGetter;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.health.HealthReporter;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.tracing.TracingUtils;
import io.vertx.amqp.AmqpClientOptions;
import io.vertx.amqp.AmqpReceiverOptions;
import io.vertx.amqp.AmqpSenderOptions;
Expand Down Expand Up @@ -114,8 +116,6 @@ public class AmqpConnector implements IncomingConnectorFactory, OutgoingConnecto

public static final String CONNECTOR_NAME = "smallrye-amqp";

static Tracer TRACER;

@Inject
private ExecutionHolder executionHolder;

Expand Down Expand Up @@ -154,6 +154,8 @@ public class AmqpConnector implements IncomingConnectorFactory, OutgoingConnecto
*/
private final Map<String, ConnectionHolder> holders = new ConcurrentHashMap<>();

private Instrumenter<AmqpMessage<?>, Void> instrumenter;

void setup(ExecutionHolder executionHolder) {
this.executionHolder = executionHolder;
}
Expand All @@ -164,7 +166,19 @@ void setup(ExecutionHolder executionHolder) {

@PostConstruct
void init() {
TRACER = GlobalOpenTelemetry.getTracerProvider().get("io.smallrye.reactive.messaging.amqp");
// TODO - radcortez - We may want to move this to the constructor injection. SR OTel provides CDI Producer for OTel
AmqpAttributesExtractor amqpAttributesExtractor = new AmqpAttributesExtractor();
MessagingAttributesGetter<AmqpMessage<?>, Void> messagingAttributesGetter = amqpAttributesExtractor
.getMessagingAttributesGetter();
InstrumenterBuilder<AmqpMessage<?>, Void> builder = Instrumenter.builder(GlobalOpenTelemetry.get(),
"io.smallrye.reactive.messaging",
MessagingSpanNameExtractor.create(messagingAttributesGetter, MessageOperation.RECEIVE));

instrumenter = builder
.addAttributesExtractor(
MessagingAttributesExtractor.create(messagingAttributesGetter, MessageOperation.RECEIVE))
.addAttributesExtractor(amqpAttributesExtractor)
.buildConsumerInstrumenter(AmqpMessageTextMapGetter.INSTANCE);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
Expand All @@ -187,7 +201,7 @@ private Multi<? extends Message<?>> getStreamOfMessages(AmqpReceiver receiver,

return Multi.createFrom().deferred(
() -> {
Multi<AmqpMessage<?>> stream = receiver.toMulti()
Multi<Message<?>> stream = receiver.toMulti()
.onItem().transformToUniAndConcatenate(m -> {
try {
return Uni.createFrom().item(new AmqpMessage<>(m, holder.getContext(), onNack,
Expand All @@ -199,7 +213,8 @@ private Multi<? extends Message<?>> getStreamOfMessages(AmqpReceiver receiver,
});

if (tracingEnabled) {
stream = stream.onItem().invoke(this::incomingTrace);
stream = stream.onItem()
.transform(m -> TracingUtils.traceIncoming(instrumenter, m, (AmqpMessage<?>) m));
}

return Multi.createBy().merging().streams(stream, processor);
Expand Down Expand Up @@ -446,32 +461,4 @@ public void reportFailure(String channel, Throwable reason) {
terminate(null);
}

private void incomingTrace(AmqpMessage<?> message) {
TracingMetadata tracingMetadata = TracingMetadata.fromMessage(message).orElse(TracingMetadata.empty());

final SpanBuilder spanBuilder = TRACER.spanBuilder(message.getAddress() + " receive")
.setSpanKind(SpanKind.CONSUMER);

// Handle possible parent span
final Context parentSpanContext = tracingMetadata.getPreviousContext();
if (parentSpanContext != null) {
spanBuilder.setParent(parentSpanContext);
} else {
spanBuilder.setNoParent();
}

final Span span = spanBuilder.startSpan();

// Set Span attributes
span.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "AMQP 1.0");
span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, message.getAddress());
span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "queue");

// Make available as parent for subsequent spans inside message processing
span.makeCurrent();

message.injectTracingMetadata(tracingMetadata.withSpan(span));

span.end();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@
import org.reactivestreams.Subscription;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.tuples.Tuple2;
import io.smallrye.reactive.messaging.TracingMetadata;
import io.smallrye.reactive.messaging.amqp.ce.AmqpCloudEventHelper;
import io.smallrye.reactive.messaging.amqp.tracing.HeaderInjectAdapter;
import io.smallrye.reactive.messaging.amqp.tracing.AmqpAttributesExtractor;
import io.smallrye.reactive.messaging.amqp.tracing.AmqpMessageTextMapSetter;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import io.smallrye.reactive.messaging.tracing.TracingUtils;
import io.vertx.amqp.impl.AmqpMessageImpl;
import io.vertx.mutiny.amqp.AmqpSender;

Expand All @@ -54,6 +55,8 @@ public class AmqpCreditBasedSender implements Processor<Message<?>, Message<?>>,
private final int retryAttempts;
private final int retryInterval;

private final Instrumenter<AmqpMessage<?>, Void> instrumenter;

private volatile boolean isAnonymous;

/**
Expand All @@ -79,6 +82,19 @@ public AmqpCreditBasedSender(AmqpConnector connector, ConnectionHolder holder,

this.retryAttempts = configuration.getReconnectAttempts();
this.retryInterval = configuration.getReconnectInterval();

AmqpAttributesExtractor amqpAttributesExtractor = new AmqpAttributesExtractor();
MessagingAttributesGetter<AmqpMessage<?>, Void> messagingAttributesGetter = amqpAttributesExtractor
.getMessagingAttributesGetter();
InstrumenterBuilder<AmqpMessage<?>, Void> builder = Instrumenter.builder(GlobalOpenTelemetry.get(),
"io.smallrye.reactive.messaging",
MessagingSpanNameExtractor.create(messagingAttributesGetter, MessageOperation.SEND));

instrumenter = builder
.addAttributesExtractor(
MessagingAttributesExtractor.create(messagingAttributesGetter, MessageOperation.SEND))
.addAttributesExtractor(amqpAttributesExtractor)
.buildProducerInstrumenter(AmqpMessageTextMapSetter.INSTANCE);
}

@Override
Expand Down Expand Up @@ -307,7 +323,9 @@ private Uni<Message<?>> send(AmqpSender sender, Message<?> msg, boolean durable,
amqp.getDelegate().unwrap().setAddress(actualAddress);
}

createOutgoingTrace(msg, amqp);
if (tracingEnabled) {
TracingUtils.traceOutgoing(instrumenter, msg, new AmqpMessage<>(amqp, null, null, false, true));
}

log.sendingMessageToAddress(actualAddress);
return sender.sendWithAck(amqp)
Expand All @@ -322,43 +340,6 @@ private Uni<Message<?>> send(AmqpSender sender, Message<?> msg, boolean durable,
.onItem().transform(x -> msg);
}

private void createOutgoingTrace(Message<?> msg, io.vertx.mutiny.amqp.AmqpMessage amqp) {
if (tracingEnabled) {
Optional<TracingMetadata> tracingMetadata = TracingMetadata.fromMessage(msg);

final SpanBuilder spanBuilder = AmqpConnector.TRACER.spanBuilder(amqp.address() + " send")
.setSpanKind(SpanKind.PRODUCER);

if (tracingMetadata.isPresent()) {
// Handle possible parent span
final Context parentSpanContext = tracingMetadata.get().getCurrentContext();
if (parentSpanContext != null) {
spanBuilder.setParent(parentSpanContext);
} else {
spanBuilder.setNoParent();
}
} else {
spanBuilder.setNoParent();
}

final Span span = spanBuilder.startSpan();
Scope scope = span.makeCurrent();

// Set Span attributes
span.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "AMQP 1.0");
span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, amqp.address());
span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "queue");
span.setAttribute(SemanticAttributes.MESSAGING_PROTOCOL, "AMQP");
span.setAttribute(SemanticAttributes.MESSAGING_PROTOCOL_VERSION, "1.0");

// Set span onto headers
GlobalOpenTelemetry.getPropagators().getTextMapPropagator()
.inject(Context.current(), amqp, HeaderInjectAdapter.SETTER);
span.end();
scope.close();
}
}

private String getActualAddress(Message<?> message, io.vertx.mutiny.amqp.AmqpMessage amqp, String configuredAddress,
boolean isAnonymousSender) {
String address = amqp.address();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,17 @@
import org.apache.qpid.proton.message.MessageError;
import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.smallrye.reactive.messaging.TracingMetadata;
import io.smallrye.reactive.messaging.amqp.ce.AmqpCloudEventHelper;
import io.smallrye.reactive.messaging.amqp.fault.AmqpFailureHandler;
import io.smallrye.reactive.messaging.amqp.tracing.HeaderExtractAdapter;
import io.smallrye.reactive.messaging.ce.CloudEventMetadata;
import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage;
import io.smallrye.reactive.messaging.providers.helpers.VertxContext;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.buffer.Buffer;

public class AmqpMessage<T> implements org.eclipse.microprofile.reactive.messaging.Message<T>, ContextAwareMessage<T> {
public class AmqpMessage<T> implements ContextAwareMessage<T>, MetadataInjectableMessage<T> {

protected static final String APPLICATION_JSON = "application/json";
protected final io.vertx.amqp.AmqpMessage message;
Expand Down Expand Up @@ -94,19 +92,6 @@ public AmqpMessage(io.vertx.amqp.AmqpMessage msg, Context context, AmqpFailureHa
payload = (T) convert(message);
}

if (tracingEnabled) {
TracingMetadata tracingMetadata = TracingMetadata.empty();
if (msg.applicationProperties() != null) {
// Read tracing headers
io.opentelemetry.context.Context otelContext = GlobalOpenTelemetry.getPropagators().getTextMapPropagator()
.extract(io.opentelemetry.context.Context.root(), msg.applicationProperties(),
HeaderExtractAdapter.GETTER);
tracingMetadata = TracingMetadata.withPrevious(otelContext);
}

meta.add(tracingMetadata);
}

this.metadata = captureContextMetadata(meta);
}

Expand Down Expand Up @@ -253,8 +238,9 @@ public Function<Throwable, CompletionStage<Void>> getNack() {
return this::nack;
}

public synchronized void injectTracingMetadata(TracingMetadata tracingMetadata) {
metadata = metadata.with(tracingMetadata);
@Override
public synchronized void injectMetadata(Object metadataObject) {
metadata = metadata.with(metadataObject);
}

}
Loading