Skip to content

Commit

Permalink
Introduce smallrye-reactive-messaging-otel module for common OTel ins…
Browse files Browse the repository at this point in the history
…trumenter code.
  • Loading branch information
ozangunalp committed Jan 3, 2023
1 parent 2ce1050 commit 877bb69
Show file tree
Hide file tree
Showing 28 changed files with 282 additions and 380 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
<module>api</module>
<module>smallrye-reactive-messaging-provider</module>
<module>smallrye-reactive-messaging-in-memory</module>
<module>smallrye-reactive-messaging-otel</module>
<module>smallrye-reactive-messaging-kafka</module>
<module>smallrye-reactive-messaging-kafka-api</module>
<module>smallrye-reactive-messaging-kafka-test-companion</module>
Expand Down
22 changes: 5 additions & 17 deletions smallrye-reactive-messaging-amqp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,13 @@
<version>${smallrye-vertx-mutiny-clients.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-amqp-client</artifactId>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api</artifactId>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-otel</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api-semconv</artifactId>
<groupId>io.vertx</groupId>
<artifactId>vertx-amqp-client</artifactId>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
Expand All @@ -49,7 +47,6 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import io.smallrye.reactive.messaging.TracingMetadata;
import io.smallrye.reactive.messaging.amqp.fault.AmqpAccept;
import io.smallrye.reactive.messaging.amqp.fault.AmqpFailStop;
import io.smallrye.reactive.messaging.amqp.fault.AmqpFailureHandler;
Expand All @@ -63,6 +60,7 @@
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.health.HealthReporter;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.tracing.TracingUtils;
import io.vertx.amqp.AmqpClientOptions;
import io.vertx.amqp.AmqpReceiverOptions;
import io.vertx.amqp.AmqpSenderOptions;
Expand Down Expand Up @@ -203,7 +201,7 @@ private Multi<? extends Message<?>> getStreamOfMessages(AmqpReceiver receiver,

return Multi.createFrom().deferred(
() -> {
Multi<AmqpMessage<?>> stream = receiver.toMulti()
Multi<Message<?>> stream = receiver.toMulti()
.onItem().transformToUniAndConcatenate(m -> {
try {
return Uni.createFrom().item(new AmqpMessage<>(m, holder.getContext(), onNack,
Expand All @@ -215,7 +213,8 @@ private Multi<? extends Message<?>> getStreamOfMessages(AmqpReceiver receiver,
});

if (tracingEnabled) {
stream = stream.onItem().invoke(this::incomingTrace);
stream = stream.onItem()
.transform(m -> TracingUtils.traceIncoming(instrumenter, m, (AmqpMessage<?>) m));
}

return Multi.createBy().merging().streams(stream, processor);
Expand Down Expand Up @@ -462,28 +461,4 @@ public void reportFailure(String channel, Throwable reason) {
terminate(null);
}

private void incomingTrace(AmqpMessage<?> message) {
TracingMetadata tracingMetadata = TracingMetadata.fromMessage(message).orElse(TracingMetadata.empty());

Context parentContext = tracingMetadata.getPreviousContext();
if (parentContext == null) {
parentContext = Context.current();
}
Context spanContext;
Scope scope = null;

boolean shouldStart = instrumenter.shouldStart(parentContext, message);
if (shouldStart) {
try {
spanContext = instrumenter.start(parentContext, message);
scope = spanContext.makeCurrent();
message.injectTracingMetadata(TracingMetadata.with(spanContext, parentContext));
instrumenter.end(spanContext, message, null, null);
} finally {
if (scope != null) {
scope.close();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import org.reactivestreams.Subscription;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
Expand All @@ -28,11 +26,11 @@
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.tuples.Tuple2;
import io.smallrye.reactive.messaging.TracingMetadata;
import io.smallrye.reactive.messaging.amqp.ce.AmqpCloudEventHelper;
import io.smallrye.reactive.messaging.amqp.tracing.AmqpAttributesExtractor;
import io.smallrye.reactive.messaging.amqp.tracing.AmqpMessageTextMapSetter;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import io.smallrye.reactive.messaging.tracing.TracingUtils;
import io.vertx.amqp.impl.AmqpMessageImpl;
import io.vertx.mutiny.amqp.AmqpSender;

Expand Down Expand Up @@ -326,7 +324,7 @@ private Uni<Message<?>> send(AmqpSender sender, Message<?> msg, boolean durable,
}

if (tracingEnabled) {
createOutgoingTrace(msg, amqp);
TracingUtils.traceOutgoing(instrumenter, msg, new AmqpMessage<>(amqp, null, null, false, true));
}

log.sendingMessageToAddress(actualAddress);
Expand All @@ -342,29 +340,6 @@ private Uni<Message<?>> send(AmqpSender sender, Message<?> msg, boolean durable,
.onItem().transform(x -> msg);
}

private void createOutgoingTrace(Message<?> msg, io.vertx.mutiny.amqp.AmqpMessage amqp) {
Optional<TracingMetadata> tracingMetadata = TracingMetadata.fromMessage(msg);
AmqpMessage<?> message = new AmqpMessage<>(amqp, null, null, false, true);

Context parentContext = tracingMetadata.map(TracingMetadata::getCurrentContext).orElse(Context.current());
Context spanContext;
Scope scope = null;

boolean shouldStart = instrumenter.shouldStart(parentContext, message);
if (shouldStart) {
try {
spanContext = instrumenter.start(parentContext, message);
scope = spanContext.makeCurrent();
message.injectTracingMetadata(TracingMetadata.with(spanContext, parentContext));
instrumenter.end(spanContext, message, null, null);
} finally {
if (scope != null) {
scope.close();
}
}
}
}

private String getActualAddress(Message<?> message, io.vertx.mutiny.amqp.AmqpMessage amqp, String configuredAddress,
boolean isAnonymousSender) {
String address = amqp.address();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@
import org.apache.qpid.proton.message.MessageError;
import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.reactive.messaging.TracingMetadata;
import io.smallrye.reactive.messaging.amqp.ce.AmqpCloudEventHelper;
import io.smallrye.reactive.messaging.amqp.fault.AmqpFailureHandler;
import io.smallrye.reactive.messaging.ce.CloudEventMetadata;
import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage;
import io.smallrye.reactive.messaging.providers.helpers.VertxContext;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.buffer.Buffer;

public class AmqpMessage<T> implements org.eclipse.microprofile.reactive.messaging.Message<T>, ContextAwareMessage<T> {
public class AmqpMessage<T> implements ContextAwareMessage<T>, MetadataInjectableMessage<T> {

protected static final String APPLICATION_JSON = "application/json";
protected final io.vertx.amqp.AmqpMessage message;
Expand Down Expand Up @@ -238,8 +238,9 @@ public Function<Throwable, CompletionStage<Void>> getNack() {
return this::nack;
}

public synchronized void injectTracingMetadata(TracingMetadata tracingMetadata) {
metadata = metadata.with(tracingMetadata);
@Override
public synchronized void injectMetadata(Object metadataObject) {
metadata = metadata.with(metadataObject);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import io.opentelemetry.api.GlobalOpenTelemetry;
Expand All @@ -37,7 +36,6 @@
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
import io.vertx.mutiny.amqp.AmqpMessage;

@Disabled("See https://github.com/smallrye/smallrye-reactive-messaging/issues/1268")
public class TracingAmqpToAppTest extends AmqpBrokerTestBase {
private SdkTracerProvider tracerProvider;
private InMemorySpanExporter spanExporter;
Expand Down Expand Up @@ -91,6 +89,7 @@ public void testFromAmqpToAppWithParentSpan() {
weld.addBeanClass(MyAppReceivingData.class);
container = weld.initialize();
await().until(() -> isAmqpConnectorReady(container));
await().until(() -> isAmqpConnectorAlive(container));

MyAppReceivingData bean = container.getBeanManager().createInstance().select(MyAppReceivingData.class).get();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public void testFromAmqpToAppToAmqp() {
weld.addBeanClass(MyAppProcessingData.class);
container = weld.initialize();
await().until(() -> isAmqpConnectorReady(container));
await().until(() -> isAmqpConnectorAlive(container));

List<Integer> payloads = new CopyOnWriteArrayList<>();
usage.consumeIntegers("result-topic", payloads::add);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,13 @@ public void testFromAppToAmqp() {
.with("amqp-password", password)
.write();

List<Integer> payloads = new CopyOnWriteArrayList<>();
usage.consumeIntegers("amqp", payloads::add);

weld.addBeanClass(MyAppGeneratingData.class);
container = weld.initialize();
await().until(() -> isAmqpConnectorReady(container));

List<Integer> payloads = new CopyOnWriteArrayList<>();
usage.consumeIntegers("amqp", payloads::add);
await().until(() -> isAmqpConnectorAlive(container));

await().until(() -> payloads.size() >= 10);
assertThat(payloads).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
Expand Down
22 changes: 5 additions & 17 deletions smallrye-reactive-messaging-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,18 @@
<artifactId>smallrye-reactive-messaging-kafka-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-otel</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api-semconv</artifactId>
</dependency>

<dependency>
<groupId>io.smallrye.config</groupId>
<artifactId>smallrye-config</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.impl.ce.KafkaCloudEventHelper;
import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;

public class IncomingKafkaRecord<K, T> implements KafkaRecord<K, T> {
public class IncomingKafkaRecord<K, T> implements KafkaRecord<K, T>, MetadataInjectableMessage<T> {

private Metadata metadata;
// TODO add as a normal import once we have removed IncomingKafkaRecordMetadata in this package
Expand Down Expand Up @@ -131,6 +132,7 @@ public CompletionStage<Void> nack(Throwable reason, Metadata metadata) {
return onNack.handle(this, reason, metadata).subscribeAsCompletionStage();
}

@Override
public synchronized void injectMetadata(Object metadata) {
this.metadata = this.metadata.with(metadata);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import org.reactivestreams.Subscriber;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
Expand All @@ -42,7 +40,6 @@
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.TracingMetadata;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
Expand All @@ -57,6 +54,8 @@
import io.smallrye.reactive.messaging.kafka.tracing.KafkaAttributesExtractor;
import io.smallrye.reactive.messaging.kafka.tracing.KafkaTrace;
import io.smallrye.reactive.messaging.kafka.tracing.KafkaTraceTextMapSetter;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.tracing.TracingUtils;

@SuppressWarnings("jol")
public class KafkaSink {
Expand Down Expand Up @@ -220,32 +219,13 @@ record = getProducerRecord(message, outgoingMetadata, incomingMetadata, actualTo
}

if (isTracingEnabled) {
KafkaTrace kafkaTrace = new KafkaTrace.Builder()
TracingUtils.traceOutgoing(instrumenter, message, new KafkaTrace.Builder()
.withPartition(record.partition() != null ? record.partition() : -1)
.withTopic(record.topic())
.withHeaders(record.headers())
.withGroupId(client.get(ConsumerConfig.GROUP_ID_CONFIG))
.withClientId(client.get(ConsumerConfig.CLIENT_ID_CONFIG))
.build();

Optional<TracingMetadata> tracingMetadata = TracingMetadata.fromMessage(message);

Context parentContext = tracingMetadata.map(TracingMetadata::getCurrentContext).orElse(Context.current());
Context spanContext;
Scope scope = null;

boolean shouldStart = instrumenter.shouldStart(parentContext, kafkaTrace);
if (shouldStart) {
try {
spanContext = instrumenter.start(parentContext, kafkaTrace);
scope = spanContext.makeCurrent();
instrumenter.end(spanContext, kafkaTrace, null, null);
} finally {
if (scope != null) {
scope.close();
}
}
}
.build());
}

log.sendingMessageToTopic(message, actualTopic);
Expand Down
Loading

0 comments on commit 877bb69

Please sign in to comment.