diff --git a/pom.xml b/pom.xml index 05ff1fdc1b..8f7666cdad 100644 --- a/pom.xml +++ b/pom.xml @@ -125,6 +125,7 @@ api smallrye-reactive-messaging-provider smallrye-reactive-messaging-in-memory + smallrye-reactive-messaging-otel smallrye-reactive-messaging-kafka smallrye-reactive-messaging-kafka-api smallrye-reactive-messaging-kafka-test-companion diff --git a/smallrye-reactive-messaging-amqp/pom.xml b/smallrye-reactive-messaging-amqp/pom.xml index 52adbfd661..5ce5df0a75 100644 --- a/smallrye-reactive-messaging-amqp/pom.xml +++ b/smallrye-reactive-messaging-amqp/pom.xml @@ -29,25 +29,13 @@ ${smallrye-vertx-mutiny-clients.version} - io.vertx - vertx-amqp-client - - - - io.opentelemetry - opentelemetry-api - - - io.opentelemetry - opentelemetry-semconv - - - io.opentelemetry.instrumentation - opentelemetry-instrumentation-api + io.smallrye.reactive + smallrye-reactive-messaging-otel + ${project.version} - io.opentelemetry.instrumentation - opentelemetry-instrumentation-api-semconv + io.vertx + vertx-amqp-client diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpConnector.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpConnector.java index 22ac717db3..f34a1554d2 100644 --- a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpConnector.java +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpConnector.java @@ -38,8 +38,6 @@ import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder; import io.opentelemetry.api.GlobalOpenTelemetry; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; @@ -49,7 +47,6 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor; -import io.smallrye.reactive.messaging.TracingMetadata; import io.smallrye.reactive.messaging.amqp.fault.AmqpAccept; import io.smallrye.reactive.messaging.amqp.fault.AmqpFailStop; import io.smallrye.reactive.messaging.amqp.fault.AmqpFailureHandler; @@ -63,6 +60,7 @@ import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.health.HealthReporter; import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; +import io.smallrye.reactive.messaging.tracing.TracingUtils; import io.vertx.amqp.AmqpClientOptions; import io.vertx.amqp.AmqpReceiverOptions; import io.vertx.amqp.AmqpSenderOptions; @@ -203,7 +201,7 @@ private Multi> getStreamOfMessages(AmqpReceiver receiver, return Multi.createFrom().deferred( () -> { - Multi> stream = receiver.toMulti() + Multi> stream = receiver.toMulti() .onItem().transformToUniAndConcatenate(m -> { try { return Uni.createFrom().item(new AmqpMessage<>(m, holder.getContext(), onNack, @@ -215,7 +213,8 @@ private Multi> getStreamOfMessages(AmqpReceiver receiver, }); if (tracingEnabled) { - stream = stream.onItem().invoke(this::incomingTrace); + stream = stream.onItem() + .transform(m -> TracingUtils.traceIncoming(instrumenter, m, (AmqpMessage) m)); } return Multi.createBy().merging().streams(stream, processor); @@ -462,28 +461,4 @@ public void reportFailure(String channel, Throwable reason) { terminate(null); } - private void incomingTrace(AmqpMessage message) { - TracingMetadata tracingMetadata = TracingMetadata.fromMessage(message).orElse(TracingMetadata.empty()); - - Context parentContext = tracingMetadata.getPreviousContext(); - if (parentContext == null) { - parentContext = Context.current(); - } - Context spanContext; - Scope scope = null; - - boolean shouldStart = instrumenter.shouldStart(parentContext, message); - if (shouldStart) { - try { - spanContext = instrumenter.start(parentContext, message); - scope = spanContext.makeCurrent(); - message.injectTracingMetadata(TracingMetadata.with(spanContext, parentContext)); - instrumenter.end(spanContext, message, null, null); - } finally { - if (scope != null) { - scope.close(); - } - } - } - } } diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpCreditBasedSender.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpCreditBasedSender.java index b7c8c49833..4869e136ec 100644 --- a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpCreditBasedSender.java +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpCreditBasedSender.java @@ -16,8 +16,6 @@ import org.reactivestreams.Subscription; import io.opentelemetry.api.GlobalOpenTelemetry; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; @@ -28,11 +26,11 @@ import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.helpers.Subscriptions; import io.smallrye.mutiny.tuples.Tuple2; -import io.smallrye.reactive.messaging.TracingMetadata; import io.smallrye.reactive.messaging.amqp.ce.AmqpCloudEventHelper; import io.smallrye.reactive.messaging.amqp.tracing.AmqpAttributesExtractor; import io.smallrye.reactive.messaging.amqp.tracing.AmqpMessageTextMapSetter; import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata; +import io.smallrye.reactive.messaging.tracing.TracingUtils; import io.vertx.amqp.impl.AmqpMessageImpl; import io.vertx.mutiny.amqp.AmqpSender; @@ -326,7 +324,7 @@ private Uni> send(AmqpSender sender, Message msg, boolean durable, } if (tracingEnabled) { - createOutgoingTrace(msg, amqp); + TracingUtils.traceOutgoing(instrumenter, msg, new AmqpMessage<>(amqp, null, null, false, true)); } log.sendingMessageToAddress(actualAddress); @@ -342,29 +340,6 @@ private Uni> send(AmqpSender sender, Message msg, boolean durable, .onItem().transform(x -> msg); } - private void createOutgoingTrace(Message msg, io.vertx.mutiny.amqp.AmqpMessage amqp) { - Optional tracingMetadata = TracingMetadata.fromMessage(msg); - AmqpMessage message = new AmqpMessage<>(amqp, null, null, false, true); - - Context parentContext = tracingMetadata.map(TracingMetadata::getCurrentContext).orElse(Context.current()); - Context spanContext; - Scope scope = null; - - boolean shouldStart = instrumenter.shouldStart(parentContext, message); - if (shouldStart) { - try { - spanContext = instrumenter.start(parentContext, message); - scope = spanContext.makeCurrent(); - message.injectTracingMetadata(TracingMetadata.with(spanContext, parentContext)); - instrumenter.end(spanContext, message, null, null); - } finally { - if (scope != null) { - scope.close(); - } - } - } - } - private String getActualAddress(Message message, io.vertx.mutiny.amqp.AmqpMessage amqp, String configuredAddress, boolean isAnonymousSender) { String address = amqp.address(); diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpMessage.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpMessage.java index 57f90c35ed..a006275533 100644 --- a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpMessage.java +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpMessage.java @@ -16,17 +16,17 @@ import org.apache.qpid.proton.message.MessageError; import org.eclipse.microprofile.reactive.messaging.Metadata; -import io.smallrye.reactive.messaging.TracingMetadata; import io.smallrye.reactive.messaging.amqp.ce.AmqpCloudEventHelper; import io.smallrye.reactive.messaging.amqp.fault.AmqpFailureHandler; import io.smallrye.reactive.messaging.ce.CloudEventMetadata; +import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage; import io.smallrye.reactive.messaging.providers.helpers.VertxContext; import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage; import io.vertx.core.json.JsonObject; import io.vertx.mutiny.core.Context; import io.vertx.mutiny.core.buffer.Buffer; -public class AmqpMessage implements org.eclipse.microprofile.reactive.messaging.Message, ContextAwareMessage { +public class AmqpMessage implements ContextAwareMessage, MetadataInjectableMessage { protected static final String APPLICATION_JSON = "application/json"; protected final io.vertx.amqp.AmqpMessage message; @@ -238,8 +238,9 @@ public Function> getNack() { return this::nack; } - public synchronized void injectTracingMetadata(TracingMetadata tracingMetadata) { - metadata = metadata.with(tracingMetadata); + @Override + public synchronized void injectMetadata(Object metadataObject) { + metadata = metadata.with(metadataObject); } } diff --git a/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/TracingAmqpToAppTest.java b/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/TracingAmqpToAppTest.java index 67fffec6d8..f78dee10ac 100644 --- a/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/TracingAmqpToAppTest.java +++ b/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/TracingAmqpToAppTest.java @@ -19,7 +19,6 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import io.opentelemetry.api.GlobalOpenTelemetry; @@ -37,7 +36,6 @@ import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; import io.vertx.mutiny.amqp.AmqpMessage; -@Disabled("See https://github.com/smallrye/smallrye-reactive-messaging/issues/1268") public class TracingAmqpToAppTest extends AmqpBrokerTestBase { private SdkTracerProvider tracerProvider; private InMemorySpanExporter spanExporter; @@ -91,6 +89,7 @@ public void testFromAmqpToAppWithParentSpan() { weld.addBeanClass(MyAppReceivingData.class); container = weld.initialize(); await().until(() -> isAmqpConnectorReady(container)); + await().until(() -> isAmqpConnectorAlive(container)); MyAppReceivingData bean = container.getBeanManager().createInstance().select(MyAppReceivingData.class).get(); diff --git a/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/TracingAmqpToAppToAmqpTest.java b/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/TracingAmqpToAppToAmqpTest.java index c01d741ce8..f1394423ac 100644 --- a/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/TracingAmqpToAppToAmqpTest.java +++ b/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/TracingAmqpToAppToAmqpTest.java @@ -98,6 +98,7 @@ public void testFromAmqpToAppToAmqp() { weld.addBeanClass(MyAppProcessingData.class); container = weld.initialize(); await().until(() -> isAmqpConnectorReady(container)); + await().until(() -> isAmqpConnectorAlive(container)); List payloads = new CopyOnWriteArrayList<>(); usage.consumeIntegers("result-topic", payloads::add); diff --git a/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/TracingAppToAmqpTest.java b/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/TracingAppToAmqpTest.java index 580ab047fd..500381bece 100644 --- a/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/TracingAppToAmqpTest.java +++ b/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/TracingAppToAmqpTest.java @@ -85,12 +85,13 @@ public void testFromAppToAmqp() { .with("amqp-password", password) .write(); + List payloads = new CopyOnWriteArrayList<>(); + usage.consumeIntegers("amqp", payloads::add); + weld.addBeanClass(MyAppGeneratingData.class); container = weld.initialize(); await().until(() -> isAmqpConnectorReady(container)); - - List payloads = new CopyOnWriteArrayList<>(); - usage.consumeIntegers("amqp", payloads::add); + await().until(() -> isAmqpConnectorAlive(container)); await().until(() -> payloads.size() >= 10); assertThat(payloads).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); diff --git a/smallrye-reactive-messaging-kafka/pom.xml b/smallrye-reactive-messaging-kafka/pom.xml index ba0a511e0c..cf6f7627cd 100644 --- a/smallrye-reactive-messaging-kafka/pom.xml +++ b/smallrye-reactive-messaging-kafka/pom.xml @@ -28,6 +28,11 @@ smallrye-reactive-messaging-kafka-api ${project.version} + + io.smallrye.reactive + smallrye-reactive-messaging-otel + ${project.version} + org.apache.kafka @@ -35,23 +40,6 @@ ${kafka.version} - - io.opentelemetry - opentelemetry-api - - - io.opentelemetry - opentelemetry-semconv - - - io.opentelemetry.instrumentation - opentelemetry-instrumentation-api - - - io.opentelemetry.instrumentation - opentelemetry-instrumentation-api-semconv - - io.smallrye.config smallrye-config diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecord.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecord.java index d88cb6fb27..e9bace8928 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecord.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecord.java @@ -14,9 +14,10 @@ import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler; import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler; import io.smallrye.reactive.messaging.kafka.impl.ce.KafkaCloudEventHelper; +import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage; import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage; -public class IncomingKafkaRecord implements KafkaRecord { +public class IncomingKafkaRecord implements KafkaRecord, MetadataInjectableMessage { private Metadata metadata; // TODO add as a normal import once we have removed IncomingKafkaRecordMetadata in this package @@ -131,6 +132,7 @@ public CompletionStage nack(Throwable reason, Metadata metadata) { return onNack.handle(this, reason, metadata).subscribeAsCompletionStage(); } + @Override public synchronized void injectMetadata(Object metadata) { this.metadata = this.metadata.with(metadata); } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSink.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSink.java index 6c54035404..4dee89d737 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSink.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSink.java @@ -32,8 +32,6 @@ import org.reactivestreams.Subscriber; import io.opentelemetry.api.GlobalOpenTelemetry; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; @@ -42,7 +40,6 @@ import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.OutgoingMessageMetadata; -import io.smallrye.reactive.messaging.TracingMetadata; import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata; import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents; @@ -57,6 +54,8 @@ import io.smallrye.reactive.messaging.kafka.tracing.KafkaAttributesExtractor; import io.smallrye.reactive.messaging.kafka.tracing.KafkaTrace; import io.smallrye.reactive.messaging.kafka.tracing.KafkaTraceTextMapSetter; +import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; +import io.smallrye.reactive.messaging.tracing.TracingUtils; @SuppressWarnings("jol") public class KafkaSink { @@ -220,32 +219,13 @@ record = getProducerRecord(message, outgoingMetadata, incomingMetadata, actualTo } if (isTracingEnabled) { - KafkaTrace kafkaTrace = new KafkaTrace.Builder() + TracingUtils.traceOutgoing(instrumenter, message, new KafkaTrace.Builder() .withPartition(record.partition() != null ? record.partition() : -1) .withTopic(record.topic()) .withHeaders(record.headers()) .withGroupId(client.get(ConsumerConfig.GROUP_ID_CONFIG)) .withClientId(client.get(ConsumerConfig.CLIENT_ID_CONFIG)) - .build(); - - Optional tracingMetadata = TracingMetadata.fromMessage(message); - - Context parentContext = tracingMetadata.map(TracingMetadata::getCurrentContext).orElse(Context.current()); - Context spanContext; - Scope scope = null; - - boolean shouldStart = instrumenter.shouldStart(parentContext, kafkaTrace); - if (shouldStart) { - try { - spanContext = instrumenter.start(parentContext, kafkaTrace); - scope = spanContext.makeCurrent(); - instrumenter.end(spanContext, kafkaTrace, null, null); - } finally { - if (scope != null) { - scope.close(); - } - } - } + .build()); } log.sendingMessageToTopic(message, actualTopic); diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java index 67bbfcc7d3..06f479038a 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java @@ -22,7 +22,6 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; @@ -32,7 +31,6 @@ import io.smallrye.common.annotation.Identifier; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; -import io.smallrye.reactive.messaging.TracingMetadata; import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler; import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord; @@ -48,6 +46,7 @@ import io.smallrye.reactive.messaging.kafka.tracing.KafkaAttributesExtractor; import io.smallrye.reactive.messaging.kafka.tracing.KafkaTrace; import io.smallrye.reactive.messaging.kafka.tracing.KafkaTraceTextMapGetter; +import io.smallrye.reactive.messaging.tracing.TracingUtils; import io.vertx.core.impl.EventLoopContext; import io.vertx.core.impl.VertxInternal; import io.vertx.mutiny.core.Vertx; @@ -279,31 +278,7 @@ public void incomingTrace(IncomingKafkaRecord kafkaRecord, boolean insideB .withClientId(client.get(ConsumerConfig.CLIENT_ID_CONFIG)) .build(); - TracingMetadata tracingMetadata = TracingMetadata.fromMessage(kafkaRecord).orElse(TracingMetadata.empty()); - Context parentContext = tracingMetadata.getPreviousContext(); - if (parentContext == null) { - parentContext = Context.current(); - } - Context spanContext; - Scope scope = null; - boolean shouldStart = instrumenter.shouldStart(parentContext, kafkaTrace); - - if (shouldStart) { - spanContext = instrumenter.start(parentContext, kafkaTrace); - if (!insideBatch) { - scope = spanContext.makeCurrent(); - } - - kafkaRecord.injectMetadata(TracingMetadata.with(spanContext, parentContext)); - - try { - instrumenter.end(spanContext, kafkaTrace, null, null); - } finally { - if (scope != null) { - scope.close(); - } - } - } + TracingUtils.traceIncoming(instrumenter, kafkaRecord, kafkaTrace, !insideBatch); } } diff --git a/smallrye-reactive-messaging-otel/pom.xml b/smallrye-reactive-messaging-otel/pom.xml new file mode 100644 index 0000000000..3458c38ade --- /dev/null +++ b/smallrye-reactive-messaging-otel/pom.xml @@ -0,0 +1,56 @@ + + + + smallrye-reactive-messaging + io.smallrye.reactive + 3.23.0-SNAPSHOT + + 4.0.0 + + smallrye-reactive-messaging-otel + + SmallRye Reactive Messaging : Open Telemetry Instrumenter + + + + ${project.groupId} + smallrye-reactive-messaging-provider + ${project.version} + + + io.opentelemetry + opentelemetry-api + + + io.opentelemetry + opentelemetry-semconv + + + io.opentelemetry.instrumentation + opentelemetry-instrumentation-api + + + io.opentelemetry.instrumentation + opentelemetry-instrumentation-api-semconv + + + + + + coverage + + @{jacocoArgLine} + + + + + org.jacoco + jacoco-maven-plugin + + + + + + diff --git a/smallrye-reactive-messaging-otel/src/main/java/io/smallrye/reactive/messaging/tracing/TracingUtils.java b/smallrye-reactive-messaging-otel/src/main/java/io/smallrye/reactive/messaging/tracing/TracingUtils.java new file mode 100644 index 0000000000..74bc791822 --- /dev/null +++ b/smallrye-reactive-messaging-otel/src/main/java/io/smallrye/reactive/messaging/tracing/TracingUtils.java @@ -0,0 +1,80 @@ +package io.smallrye.reactive.messaging.tracing; + +import java.util.Optional; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.smallrye.reactive.messaging.TracingMetadata; +import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage; + +public class TracingUtils { + + private TracingUtils() { + } + + public static void traceOutgoing(Instrumenter instrumenter, Message message, T trace) { + Optional tracingMetadata = TracingMetadata.fromMessage(message); + + Context parentContext = tracingMetadata.map(TracingMetadata::getCurrentContext).orElse(Context.current()); + Context spanContext; + Scope scope = null; + + boolean shouldStart = instrumenter.shouldStart(parentContext, trace); + if (shouldStart) { + try { + spanContext = instrumenter.start(parentContext, trace); + scope = spanContext.makeCurrent(); + instrumenter.end(spanContext, trace, null, null); + } finally { + if (scope != null) { + scope.close(); + } + } + } + } + + public static Message traceIncoming(Instrumenter instrumenter, Message msg, T trace) { + return traceIncoming(instrumenter, msg, trace, true); + } + + public static Message traceIncoming(Instrumenter instrumenter, Message msg, T trace, + boolean makeCurrent) { + TracingMetadata tracingMetadata = TracingMetadata.fromMessage(msg).orElse(TracingMetadata.empty()); + Context parentContext = tracingMetadata.getPreviousContext(); + if (parentContext == null) { + parentContext = Context.current(); + } + Context spanContext; + Scope scope = null; + boolean shouldStart = instrumenter.shouldStart(parentContext, trace); + + if (shouldStart) { + spanContext = instrumenter.start(parentContext, trace); + if (makeCurrent) { + scope = spanContext.makeCurrent(); + } + + Message message; + TracingMetadata newTracingMetadata = TracingMetadata.with(spanContext, parentContext); + if (msg instanceof MetadataInjectableMessage) { + ((MetadataInjectableMessage) msg).injectMetadata(newTracingMetadata); + message = msg; + } else { + message = msg.addMetadata(newTracingMetadata); + } + + try { + instrumenter.end(spanContext, trace, null, null); + } finally { + if (scope != null) { + scope.close(); + } + } + return message; + } + return msg; + } +} diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/MetadataInjectableMessage.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/MetadataInjectableMessage.java new file mode 100644 index 0000000000..cd11aac06c --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/MetadataInjectableMessage.java @@ -0,0 +1,18 @@ +package io.smallrye.reactive.messaging.providers; + +import org.eclipse.microprofile.reactive.messaging.Message; + +/** + * Message type which enables injecting new metadata without creating a new {@link Message} instance + * + * @param type of payload + */ +public interface MetadataInjectableMessage extends Message { + + /** + * Inject the given metadata object + * + * @param metadataObject metadata object + */ + void injectMetadata(Object metadataObject); +} diff --git a/smallrye-reactive-messaging-rabbitmq/pom.xml b/smallrye-reactive-messaging-rabbitmq/pom.xml index a5ff4035d1..1075518e94 100644 --- a/smallrye-reactive-messaging-rabbitmq/pom.xml +++ b/smallrye-reactive-messaging-rabbitmq/pom.xml @@ -28,6 +28,11 @@ smallrye-mutiny-vertx-rabbitmq-client ${smallrye-vertx-mutiny-clients.version} + + io.smallrye.reactive + smallrye-reactive-messaging-otel + ${project.version} + io.vertx vertx-rabbitmq-client @@ -51,23 +56,6 @@ provided - - io.opentelemetry - opentelemetry-api - - - io.opentelemetry - opentelemetry-semconv - - - io.opentelemetry.instrumentation - opentelemetry-instrumentation-api - - - io.opentelemetry.instrumentation - opentelemetry-instrumentation-api-semconv - - org.testcontainers testcontainers diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java index 78626dabae..176924ee52 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java @@ -22,6 +22,7 @@ import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler; import io.vertx.core.buffer.Buffer; import io.vertx.mutiny.core.Context; +import io.vertx.mutiny.rabbitmq.RabbitMQMessage; /** * An implementation of {@link Message} suitable for incoming RabbitMQ messages. @@ -56,32 +57,25 @@ public CompletionStage handle(IncomingRabbitMQMessage message, Cont private final long deliveryTag; private RabbitMQFailureHandler onNack; private RabbitMQAckHandler onAck; - private final RabbitMQConnectorIncomingConfiguration incomingConfiguration; + private final String contentTypeOverride; private final T payload; - IncomingRabbitMQMessage( - final io.vertx.mutiny.rabbitmq.RabbitMQMessage delegate, - final ConnectionHolder holder, - final RabbitMQFailureHandler onNack, - final RabbitMQAckHandler onAck, - final RabbitMQConnectorIncomingConfiguration incomingConfiguration) { - this(delegate.getDelegate(), holder, onNack, onAck, incomingConfiguration); - } - - IncomingRabbitMQMessage( - final io.vertx.rabbitmq.RabbitMQMessage msg, - final ConnectionHolder holder, - final RabbitMQFailureHandler onNack, - final RabbitMQAckHandler onAck, - final RabbitMQConnectorIncomingConfiguration incomingConfiguration) { + IncomingRabbitMQMessage(RabbitMQMessage delegate, ConnectionHolder holder, + RabbitMQFailureHandler onNack, + RabbitMQAckHandler onAck, String contentTypeOverride) { + this(delegate.getDelegate(), holder, onNack, onAck, contentTypeOverride); + } + + IncomingRabbitMQMessage(io.vertx.rabbitmq.RabbitMQMessage msg, ConnectionHolder holder, + RabbitMQFailureHandler onNack, RabbitMQAckHandler onAck, String contentTypeOverride) { this.message = msg; this.deliveryTag = msg.envelope().getDeliveryTag(); this.holder = holder; this.context = holder.getContext(); - this.rabbitMQMetadata = new IncomingRabbitMQMetadata(message, incomingConfiguration); + this.contentTypeOverride = contentTypeOverride; + this.rabbitMQMetadata = new IncomingRabbitMQMetadata(this.message); this.onNack = onNack; this.onAck = onAck; - this.incomingConfiguration = incomingConfiguration; this.metadata = captureContextMetadata(rabbitMQMetadata); //noinspection unchecked this.payload = (T) convertPayload(message); @@ -155,10 +149,14 @@ public Metadata getMetadata() { private Object convertPayload(io.vertx.rabbitmq.RabbitMQMessage msg) { // Neither of these are guaranteed to be non-null - final String contentType = incomingConfiguration.getContentTypeOverride().orElse(msg.properties().getContentType()); + String contentType = msg.properties().getContentType(); final String contentEncoding = msg.properties().getContentEncoding(); final Buffer body = msg.body(); + if (this.contentTypeOverride != null) { + contentType = contentTypeOverride; + } + // If there is a content encoding specified, we don't try to unwrap if (contentEncoding == null) { try { diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMetadata.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMetadata.java index 7166807bb0..e619d478c8 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMetadata.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMetadata.java @@ -21,7 +21,6 @@ public class IncomingRabbitMQMetadata { private final RabbitMQMessage message; - private final RabbitMQConnectorIncomingConfiguration incomingConfiguration; private final Map headers; /** @@ -29,10 +28,8 @@ public class IncomingRabbitMQMetadata { * * @param message the underlying {@link RabbitMQMessage} */ - IncomingRabbitMQMetadata(final RabbitMQMessage message, - final RabbitMQConnectorIncomingConfiguration incomingConfiguration) { + IncomingRabbitMQMetadata(RabbitMQMessage message) { this.message = message; - this.incomingConfiguration = incomingConfiguration; // Ensure the message headers are cast appropriately final Map incomingHeaders = message.properties().getHeaders(); @@ -278,10 +275,6 @@ public Optional getId() { return Optional.ofNullable(message.properties().getMessageId()); } - public String getQueueName() { - return incomingConfiguration.getQueueName(); - } - /** * The exchange the message was delivered to. *

@@ -316,8 +309,4 @@ public String getRoutingKey() { public boolean isRedeliver() { return message.envelope().isRedeliver(); } - - public boolean isTracingEnabled() { - return incomingConfiguration.getTracingEnabled(); - } } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java index 12ce13663a..0057d07e61 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java @@ -1,5 +1,6 @@ package io.smallrye.reactive.messaging.rabbitmq; +import static io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation.RECEIVE; import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING; import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING_AND_OUTGOING; import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.OUTGOING; @@ -12,6 +13,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; +import javax.annotation.PostConstruct; import javax.annotation.Priority; import javax.enterprise.context.ApplicationScoped; import javax.enterprise.context.BeforeDestroyed; @@ -34,6 +36,12 @@ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.impl.CredentialsProvider; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.tuples.Tuple2; @@ -48,6 +56,10 @@ import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailStop; import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler; import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQReject; +import io.smallrye.reactive.messaging.rabbitmq.tracing.RabbitMQTrace; +import io.smallrye.reactive.messaging.rabbitmq.tracing.RabbitMQTraceAttributesExtractor; +import io.smallrye.reactive.messaging.rabbitmq.tracing.RabbitMQTraceTextMapGetter; +import io.smallrye.reactive.messaging.tracing.TracingUtils; import io.vertx.core.json.JsonObject; import io.vertx.mutiny.core.Vertx; import io.vertx.mutiny.rabbitmq.RabbitMQClient; @@ -141,7 +153,7 @@ public class RabbitMQConnector implements IncomingConnectorFactory, OutgoingConn private enum ChannelStatus { CONNECTED, NOT_CONNECTED, - INITIALISING + INITIALISING; } // The list of RabbitMQClient's currently managed by this connector @@ -166,6 +178,8 @@ private enum ChannelStatus { @Any Instance credentialsProviders; + private Instrumenter instrumenter; + RabbitMQConnector() { // used for proxies } @@ -174,6 +188,19 @@ public static String getExchangeName(final RabbitMQConnectorCommonConfiguration return config.getExchangeName().map(s -> "\"\"".equals(s) ? "" : s).orElse(config.getChannel()); } + @PostConstruct + void init() { + RabbitMQTraceAttributesExtractor rabbitMQAttributesExtractor = new RabbitMQTraceAttributesExtractor(); + MessagingAttributesGetter messagingAttributesGetter = rabbitMQAttributesExtractor + .getMessagingAttributesGetter(); + InstrumenterBuilder builder = Instrumenter.builder(GlobalOpenTelemetry.get(), + "io.smallrye.reactive.messaging", MessagingSpanNameExtractor.create(messagingAttributesGetter, RECEIVE)); + + instrumenter = builder.addAttributesExtractor(rabbitMQAttributesExtractor) + .addAttributesExtractor(MessagingAttributesExtractor.create(messagingAttributesGetter, RECEIVE)) + .buildConsumerInstrumenter(RabbitMQTraceTextMapGetter.INSTANCE); + } + private Multi> getStreamOfMessages( RabbitMQConsumer receiver, ConnectionHolder holder, @@ -181,9 +208,22 @@ private Multi> getStreamOfMessages( RabbitMQFailureHandler onNack, RabbitMQAckHandler onAck) { - log.receiverListeningAddress(ic.getQueueName()); - - return receiver.toMulti().map(m -> new IncomingRabbitMQMessage<>(m, holder, onNack, onAck, ic)); + final String queueName = ic.getQueueName(); + final boolean isTracingEnabled = ic.getTracingEnabled(); + final String contentTypeOverride = ic.getContentTypeOverride().orElse(null); + log.receiverListeningAddress(queueName); + + return receiver.toMulti() + .map(m -> new IncomingRabbitMQMessage<>(m, holder, onNack, onAck, contentTypeOverride)) + .plug(m -> { + if (isTracingEnabled) { + return m.map(msg -> { + TracingUtils.traceIncoming(instrumenter, msg, RabbitMQTrace.trace(queueName, msg.getHeaders())); + return msg; + }); + } + return m; + }); } /** diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQMessageConverter.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQMessageConverter.java index 0df45d3768..6e34fa7c41 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQMessageConverter.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQMessageConverter.java @@ -16,7 +16,7 @@ import io.netty.handler.codec.http.HttpHeaderValues; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.smallrye.reactive.messaging.rabbitmq.tracing.RabbitMQTrace; -import io.smallrye.reactive.messaging.rabbitmq.tracing.TracingUtils; +import io.smallrye.reactive.messaging.tracing.TracingUtils; import io.vertx.core.json.Json; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; @@ -78,7 +78,7 @@ public static OutgoingRabbitMQMessage convert( if (isTracingEnabled) { // Create a new span for the outbound message and record updated tracing information in // the headers; this has to be done before we build the properties below - TracingUtils.createOutgoingTrace(instrumenter, message, sourceHeaders, exchange); + TracingUtils.traceOutgoing(instrumenter, message, RabbitMQTrace.trace(exchange, sourceHeaders)); } // Reconstruct the properties from the source, except with the (possibly) modified headers; @@ -120,7 +120,7 @@ public static OutgoingRabbitMQMessage convert( if (isTracingEnabled) { // Create a new span for the outbound message and record updated tracing information in // the message headers; this has to be done before we build the properties below - TracingUtils.createOutgoingTrace(instrumenter, message, metadata.getHeaders(), exchange); + TracingUtils.traceOutgoing(instrumenter, message, RabbitMQTrace.trace(exchange, metadata.getHeaders())); } final Date timestamp = (metadata.getTimestamp() != null) ? Date.from(metadata.getTimestamp().toInstant()) : null; diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/tracing/RabbitMQTracingSubscriberDecorator.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/tracing/RabbitMQTracingSubscriberDecorator.java deleted file mode 100644 index 1953d29c58..0000000000 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/tracing/RabbitMQTracingSubscriberDecorator.java +++ /dev/null @@ -1,87 +0,0 @@ -package io.smallrye.reactive.messaging.rabbitmq.tracing; - -import static io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation.RECEIVE; - -import java.util.List; -import java.util.Optional; - -import javax.enterprise.context.ApplicationScoped; - -import org.eclipse.microprofile.reactive.messaging.Message; - -import io.opentelemetry.api.GlobalOpenTelemetry; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; -import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; -import io.smallrye.mutiny.Multi; -import io.smallrye.reactive.messaging.SubscriberDecorator; -import io.smallrye.reactive.messaging.TracingMetadata; -import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMetadata; - -@ApplicationScoped -public class RabbitMQTracingSubscriberDecorator implements SubscriberDecorator { - private final Instrumenter instrumenter; - - public RabbitMQTracingSubscriberDecorator() { - RabbitMQTraceAttributesExtractor rabbitMQAttributesExtractor = new RabbitMQTraceAttributesExtractor(); - MessagingAttributesGetter messagingAttributesGetter = rabbitMQAttributesExtractor - .getMessagingAttributesGetter(); - InstrumenterBuilder builder = Instrumenter.builder(GlobalOpenTelemetry.get(), - "io.smallrye.reactive.messaging", MessagingSpanNameExtractor.create(messagingAttributesGetter, RECEIVE)); - - instrumenter = builder.addAttributesExtractor(rabbitMQAttributesExtractor) - .addAttributesExtractor(MessagingAttributesExtractor.create(messagingAttributesGetter, RECEIVE)) - .buildConsumerInstrumenter(RabbitMQTraceTextMapGetter.INSTANCE); - } - - @Override - public Multi> decorate( - final Multi> toBeSubscribed, - final List channelName, - final boolean isConnector) { - return toBeSubscribed.onItem().transform(this::traceMessage); - } - - private Message traceMessage(final Message message) { - Optional incomingRabbitMQMetadata = message.getMetadata().get(IncomingRabbitMQMetadata.class); - if (incomingRabbitMQMetadata.isEmpty()) { - return message; - } - - IncomingRabbitMQMetadata metadata = incomingRabbitMQMetadata.get(); - if (!metadata.isTracingEnabled()) { - // TODO - We can optimize this and not even create the bean? - return message; - } - - TracingMetadata tracingMetadata = TracingMetadata.fromMessage(message).orElse(TracingMetadata.empty()); - RabbitMQTrace trace = RabbitMQTrace.trace(metadata.getQueueName(), metadata.getHeaders()); - - Context parentContext = tracingMetadata.getPreviousContext(); - if (parentContext == null) { - parentContext = Context.current(); - } - Context spanContext; - Scope scope = null; - - boolean shouldStart = instrumenter.shouldStart(parentContext, trace); - if (shouldStart) { - try { - spanContext = instrumenter.start(parentContext, trace); - scope = spanContext.makeCurrent(); - instrumenter.end(spanContext, trace, null, null); - return message.addMetadata(TracingMetadata.with(spanContext, parentContext)); - } finally { - if (scope != null) { - scope.close(); - } - } - } - - return message; - } -} diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/tracing/TracingUtils.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/tracing/TracingUtils.java deleted file mode 100644 index cfd8a48c69..0000000000 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/tracing/TracingUtils.java +++ /dev/null @@ -1,49 +0,0 @@ -package io.smallrye.reactive.messaging.rabbitmq.tracing; - -import java.util.Map; -import java.util.Optional; - -import org.eclipse.microprofile.reactive.messaging.Message; - -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; -import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import io.smallrye.reactive.messaging.TracingMetadata; - -/** - * Utility methods to manage spans for incoming and outgoing messages. - */ -public abstract class TracingUtils { - /** - * Private constructor to prevent instantiation. - */ - private TracingUtils() { - } - - public static void createOutgoingTrace( - final Instrumenter instrumenter, - final Message msg, - final Map headers, - final String exchange) { - - Optional tracingMetadata = TracingMetadata.fromMessage(msg); - RabbitMQTrace message = RabbitMQTrace.trace(exchange, headers); - - Context parentContext = tracingMetadata.map(TracingMetadata::getCurrentContext).orElse(Context.current()); - Context spanContext; - Scope scope = null; - - boolean shouldStart = instrumenter.shouldStart(parentContext, message); - if (shouldStart) { - try { - spanContext = instrumenter.start(parentContext, message); - scope = spanContext.makeCurrent(); - instrumenter.end(spanContext, message, null, null); - } finally { - if (scope != null) { - scope.close(); - } - } - } - } -} diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/ConsumptionBean.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/ConsumptionBean.java index a4acb56f20..71b19771f0 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/ConsumptionBean.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/ConsumptionBean.java @@ -25,7 +25,7 @@ public class ConsumptionBean { @Incoming("data") @Outgoing("sink") @Acknowledgment(Acknowledgment.Strategy.MANUAL) - public Message process(IncomingRabbitMQMessage input) { + public Message process(Message input) { int value = -1; try { value = Integer.parseInt(input.getPayload()); diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessageTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessageTest.java index aceceb1b01..976b6922fd 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessageTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessageTest.java @@ -5,7 +5,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -46,13 +45,12 @@ public void testDoubleAckBehavior() { when(mockMsg.envelope()).thenReturn(new Envelope(13456, false, "test", "test")); RabbitMQMessage msg = RabbitMQMessage.newInstance(mockMsg); - RabbitMQConnectorIncomingConfiguration incomingConfiguration = mock(RabbitMQConnectorIncomingConfiguration.class); - when(incomingConfiguration.getContentTypeOverride()).thenReturn(Optional.of("text/plain")); - Exception nackReason = new Exception("test"); - IncomingRabbitMQMessage ackMsg = new IncomingRabbitMQMessage<>(msg, mock(ConnectionHolder.class), doNothingNack, - doNothingAck, incomingConfiguration); + IncomingRabbitMQMessage ackMsg = new IncomingRabbitMQMessage<>(msg, mock(ConnectionHolder.class), + doNothingNack, + doNothingAck, + "text/plain"); assertDoesNotThrow(() -> ackMsg.ack().toCompletableFuture().get()); assertDoesNotThrow(() -> ackMsg.ack().toCompletableFuture().get()); @@ -67,13 +65,12 @@ public void testDoubleNackBehavior() { when(mockMsg.envelope()).thenReturn(new Envelope(13456, false, "test", "test")); RabbitMQMessage msg = RabbitMQMessage.newInstance(mockMsg); - RabbitMQConnectorIncomingConfiguration incomingConfiguration = mock(RabbitMQConnectorIncomingConfiguration.class); - when(incomingConfiguration.getContentTypeOverride()).thenReturn(Optional.of("text/plain")); - Exception nackReason = new Exception("test"); IncomingRabbitMQMessage nackMsg = new IncomingRabbitMQMessage<>(msg, mock(ConnectionHolder.class), - doNothingNack, doNothingAck, incomingConfiguration); + doNothingNack, + doNothingAck, + "text/plain"); assertDoesNotThrow(() -> nackMsg.nack(nackReason).toCompletableFuture().get()); assertDoesNotThrow(() -> nackMsg.nack(nackReason).toCompletableFuture().get()); @@ -88,11 +85,9 @@ void testConvertPayload() { when(mockMsg.envelope()).thenReturn(new Envelope(13456, false, "test", "test")); RabbitMQMessage msg = RabbitMQMessage.newInstance(mockMsg); - RabbitMQConnectorIncomingConfiguration incomingConfiguration = mock(RabbitMQConnectorIncomingConfiguration.class); - when(incomingConfiguration.getContentTypeOverride()).thenReturn(Optional.of("text/plain")); - IncomingRabbitMQMessage incomingRabbitMQMessage = new IncomingRabbitMQMessage<>(msg, - mock(ConnectionHolder.class), doNothingNack, doNothingAck, incomingConfiguration); + mock(ConnectionHolder.class), + doNothingNack, doNothingAck, "text/plain"); assertThat(incomingRabbitMQMessage.getPayload()).isEqualTo("payload"); } @@ -107,11 +102,9 @@ void testConvertPayloadJsonObject() { when(mockMsg.envelope()).thenReturn(new Envelope(13456, false, "test", "test")); RabbitMQMessage msg = RabbitMQMessage.newInstance(mockMsg); - RabbitMQConnectorIncomingConfiguration incomingConfiguration = mock(RabbitMQConnectorIncomingConfiguration.class); - when(incomingConfiguration.getContentTypeOverride()).thenReturn(Optional.empty()); - IncomingRabbitMQMessage incomingRabbitMQMessage = new IncomingRabbitMQMessage<>(msg, - mock(ConnectionHolder.class), doNothingNack, doNothingAck, incomingConfiguration); + mock(ConnectionHolder.class), + doNothingNack, doNothingAck, null); assertThat(incomingRabbitMQMessage.getPayload()).isEqualTo(payload); } @@ -125,12 +118,9 @@ void testConvertPayloadFallback() { when(mockMsg.envelope()).thenReturn(new Envelope(13456, false, "test", "test")); RabbitMQMessage msg = RabbitMQMessage.newInstance(mockMsg); - RabbitMQConnectorIncomingConfiguration incomingConfiguration = mock(RabbitMQConnectorIncomingConfiguration.class); - when(incomingConfiguration.getContentTypeOverride()).thenReturn(Optional.empty()); - IncomingRabbitMQMessage incomingRabbitMQMessage = new IncomingRabbitMQMessage<>(msg, mock(ConnectionHolder.class), - doNothingNack, doNothingAck, incomingConfiguration); + doNothingNack, doNothingAck, null); assertThat(((Message) ((Message) incomingRabbitMQMessage)).getPayload()).isEqualTo(payloadBuffer.getBytes()); } diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMetadataTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMetadataTest.java index b9e9ef2279..9faa2a87c4 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMetadataTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMetadataTest.java @@ -1,12 +1,10 @@ package io.smallrye.reactive.messaging.rabbitmq; -import static org.junit.jupiter.api.Assertions.*; -import static org.junit.jupiter.api.Assertions.assertEquals; - import java.util.Date; import java.util.HashMap; import java.util.Map; +import org.junit.Assert; import org.junit.jupiter.api.Test; import com.rabbitmq.client.BasicProperties; @@ -25,15 +23,15 @@ public void testHeaderWithNullValue() { DummyRabbitMQMessage message = new DummyRabbitMQMessage(new DummyBasicProperties(properties)); - IncomingRabbitMQMetadata incomingRabbitMQMetadata = new IncomingRabbitMQMetadata(message, null); + IncomingRabbitMQMetadata incomingRabbitMQMetadata = new IncomingRabbitMQMetadata(message); - assertEquals("value1", incomingRabbitMQMetadata.getHeaders().get("header1")); - assertTrue(incomingRabbitMQMetadata.getHeaders().containsKey("header2")); - assertNull(incomingRabbitMQMetadata.getHeaders().get("header2")); + Assert.assertEquals("value1", incomingRabbitMQMetadata.getHeaders().get("header1")); + Assert.assertTrue(incomingRabbitMQMetadata.getHeaders().containsKey("header2")); + Assert.assertNull(incomingRabbitMQMetadata.getHeaders().get("header2")); } - static class DummyRabbitMQMessage implements RabbitMQMessage { + class DummyRabbitMQMessage implements RabbitMQMessage { protected BasicProperties properties; DummyRabbitMQMessage(BasicProperties properties) { @@ -66,7 +64,7 @@ public Integer messageCount() { } } - static class DummyBasicProperties implements BasicProperties { + class DummyBasicProperties implements BasicProperties { protected Map headers; DummyBasicProperties(Map headers) { diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQMetadataTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQMetadataTest.java index 5be9b0f7ed..37c3daccbc 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQMetadataTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQMetadataTest.java @@ -66,7 +66,7 @@ public Integer messageCount() { } }; - IncomingRabbitMQMetadata incoming = new IncomingRabbitMQMetadata(message, null); + IncomingRabbitMQMetadata incoming = new IncomingRabbitMQMetadata(message); assertThat(incoming.getUserId()).isEqualTo(Optional.of("test-user")); assertThat(incoming.getAppId()).isEqualTo(Optional.of("tests")); assertThat(incoming.getContentType()).isEqualTo(Optional.of("text/plain")); diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/TracingTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/TracingTest.java index c5d7d5e012..25d88134b4 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/TracingTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/TracingTest.java @@ -47,7 +47,6 @@ import io.opentelemetry.sdk.trace.samplers.Sampler; import io.smallrye.reactive.messaging.providers.connectors.InMemoryConnector; import io.smallrye.reactive.messaging.providers.connectors.InMemorySource; -import io.smallrye.reactive.messaging.rabbitmq.tracing.RabbitMQTracingSubscriberDecorator; public class TracingTest extends WeldTestBase { private SdkTracerProvider tracerProvider; @@ -69,8 +68,6 @@ public void openTelemetry() { .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) .setTracerProvider(tracerProvider) .buildAndRegisterGlobal(); - - addBeans(RabbitMQTracingSubscriberDecorator.class); } @Test diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/WeldTestBase.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/WeldTestBase.java index 79953f3084..8fd733c7e6 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/WeldTestBase.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/WeldTestBase.java @@ -14,7 +14,6 @@ import io.smallrye.config.inject.ConfigExtension; import io.smallrye.reactive.messaging.providers.MediatorFactory; -import io.smallrye.reactive.messaging.providers.OutgoingInterceptorDecorator; import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; import io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry; import io.smallrye.reactive.messaging.providers.extension.ChannelProducer; @@ -70,7 +69,6 @@ public void initWeld() { weld.addBeanClass(MetricDecorator.class); weld.addBeanClass(MicrometerDecorator.class); weld.addBeanClass(ContextDecorator.class); - weld.addBeanClass(OutgoingInterceptorDecorator.class); weld.disableDiscovery(); }