Skip to content

Commit

Permalink
Merge pull request #2227 from brunobat/main
Browse files Browse the repository at this point in the history
Bump to OTel 1.28
  • Loading branch information
cescoffier authored Jul 23, 2023
2 parents cab8f46 + 570f8e8 commit 2665368
Show file tree
Hide file tree
Showing 13 changed files with 46 additions and 56 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@

<kafka.version>3.5.1</kafka.version>

<opentelemetry.version>1.25.0-alpha</opentelemetry.version>
<opentelemetry.version>1.28.0-alpha</opentelemetry.version>

<smallrye-vertx-mutiny-clients.version>3.3.0</smallrye-vertx-mutiny-clients.version>
<smallrye-reactive-converters.version>3.0.0</smallrye-reactive-converters.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,6 @@ public String getSystem(final AmqpMessage<?> amqpMessage) {
return "AMQP 1.0";
}

// Required if the message destination is either a queue or topic
@Override
public String getDestinationKind(final AmqpMessage<?> amqpMessage) {
return "queue";
}

// Required
@Override
public String getDestination(final AmqpMessage<?> amqpMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public static AmqpOpenTelemetryInstrumenter createForSender() {
}

private static AmqpOpenTelemetryInstrumenter create(boolean sender) {
MessageOperation messageOperation = sender ? MessageOperation.SEND : MessageOperation.RECEIVE;
MessageOperation messageOperation = sender ? MessageOperation.PUBLISH : MessageOperation.RECEIVE;
AmqpAttributesExtractor amqpAttributesExtractor = new AmqpAttributesExtractor();
MessagingAttributesGetter<AmqpMessage<?>, Void> messagingAttributesGetter = amqpAttributesExtractor
.getMessagingAttributesGetter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static io.opentelemetry.api.trace.SpanKind.CONSUMER;
import static io.opentelemetry.api.trace.SpanKind.PRODUCER;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION_KIND;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION_NAME;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_OPERATION;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_SYSTEM;
Expand Down Expand Up @@ -140,17 +139,15 @@ public void testFromAmqpToAppToAmqp() {
SpanData consumer = parentSpans.get(0);
assertEquals(CONSUMER, consumer.getKind());
assertEquals("AMQP 1.0", consumer.getAttributes().get(MESSAGING_SYSTEM));
assertEquals("queue", consumer.getAttributes().get(MESSAGING_DESTINATION_KIND));
assertEquals("parent-topic", consumer.getAttributes().get(MESSAGING_DESTINATION_NAME));
assertEquals("parent-topic receive", consumer.getName());
assertEquals("receive", consumer.getAttributes().get(MESSAGING_OPERATION));

SpanData producer = spans.stream().filter(span -> span.getParentSpanId().equals(consumer.getSpanId())).findFirst()
.get();
assertEquals(PRODUCER, producer.getKind());
assertEquals("queue", producer.getAttributes().get(MESSAGING_DESTINATION_KIND));
assertEquals("result-topic", producer.getAttributes().get(MESSAGING_DESTINATION_NAME));
assertEquals("result-topic send", producer.getName());
assertEquals("result-topic publish", producer.getName());
assertNull(producer.getAttributes().get(MESSAGING_OPERATION));
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,6 @@ public String getSystem(final KafkaTrace kafkaTrace) {
return "kafka";
}

@Override
public String getDestinationKind(final KafkaTrace kafkaTrace) {
return "topic";
}

@Override
public String getDestination(final KafkaTrace kafkaTrace) {
return kafkaTrace.getTopic();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public static KafkaOpenTelemetryInstrumenter createForSink() {

private static KafkaOpenTelemetryInstrumenter create(boolean source) {

MessageOperation messageOperation = source ? MessageOperation.RECEIVE : MessageOperation.SEND;
MessageOperation messageOperation = source ? MessageOperation.RECEIVE : MessageOperation.PUBLISH;

KafkaAttributesExtractor kafkaAttributesExtractor = new KafkaAttributesExtractor();
MessagingAttributesGetter<KafkaTrace, Void> messagingAttributesGetter = kafkaAttributesExtractor
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.smallrye.reactive.messaging.kafka.tracing;

import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION_KIND;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION_NAME;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_OPERATION;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_SYSTEM;
import static io.smallrye.reactive.messaging.kafka.companion.RecordQualifiers.until;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -111,10 +113,13 @@ public void testFromAppToKafka() {

SpanData span = spans.get(0);
assertEquals(SpanKind.PRODUCER, span.getKind());
assertEquals(4, span.getAttributes().size());
assertEquals("kafka", span.getAttributes().get(MESSAGING_SYSTEM));
assertEquals("topic", span.getAttributes().get(MESSAGING_DESTINATION_KIND));
assertEquals(topic, span.getAttributes().get(MESSAGING_DESTINATION_NAME));
assertEquals(topic + " send", span.getName());
assertEquals("kafka-producer-kafka", span.getAttributes().get(MESSAGING_KAFKA_CLIENT_ID));
assertEquals(0, span.getAttributes().get(MESSAGING_KAFKA_MESSAGE_OFFSET));

assertEquals(topic + " publish", span.getName());
});
}

Expand Down Expand Up @@ -145,10 +150,12 @@ public void testFromAppToKafkaWithStructuredCloudEvents() {

SpanData span = spans.get(0);
assertEquals(SpanKind.PRODUCER, span.getKind());
assertEquals(4, span.getAttributes().size());
assertEquals("kafka", span.getAttributes().get(MESSAGING_SYSTEM));
assertEquals("topic", span.getAttributes().get(MESSAGING_DESTINATION_KIND));
assertEquals(topic, span.getAttributes().get(MESSAGING_DESTINATION_NAME));
assertEquals(topic + " send", span.getName());
assertEquals("kafka-producer-kafka", span.getAttributes().get(MESSAGING_KAFKA_CLIENT_ID));
assertEquals(0, span.getAttributes().get(MESSAGING_KAFKA_MESSAGE_OFFSET));
assertEquals(topic + " publish", span.getName());
});
}

Expand Down Expand Up @@ -179,10 +186,12 @@ public void testFromAppToKafkaWithBinaryCloudEvents() {

SpanData span = spans.get(0);
assertEquals(SpanKind.PRODUCER, span.getKind());
assertEquals(4, span.getAttributes().size());
assertEquals("kafka", span.getAttributes().get(MESSAGING_SYSTEM));
assertEquals("topic", span.getAttributes().get(MESSAGING_DESTINATION_KIND));
assertEquals(topic, span.getAttributes().get(MESSAGING_DESTINATION_NAME));
assertEquals(topic + " send", span.getName());
assertEquals("kafka-producer-kafka", span.getAttributes().get(MESSAGING_KAFKA_CLIENT_ID));
assertEquals(0, span.getAttributes().get(MESSAGING_KAFKA_MESSAGE_OFFSET));
assertEquals(topic + " publish", span.getName());
});
}

Expand Down Expand Up @@ -222,17 +231,23 @@ public void testFromKafkaToAppToKafka() {

SpanData consumer = parentSpans.get(0);
assertEquals(SpanKind.CONSUMER, consumer.getKind());
assertEquals("topic", consumer.getAttributes().get(MESSAGING_DESTINATION_KIND));
assertEquals(8, consumer.getAttributes().size());
assertEquals("kafka", consumer.getAttributes().get(MESSAGING_SYSTEM));
assertEquals("receive", consumer.getAttributes().get(MESSAGING_OPERATION));
assertEquals(parentTopic, consumer.getAttributes().get(MESSAGING_DESTINATION_NAME));
assertEquals("kafka-consumer-source", consumer.getAttributes().get(MESSAGING_KAFKA_CLIENT_ID));
assertEquals(0, consumer.getAttributes().get(MESSAGING_KAFKA_MESSAGE_OFFSET));
assertEquals(parentTopic + " receive", consumer.getName());

SpanData producer = spans.stream().filter(spanData -> spanData.getParentSpanId().equals(consumer.getSpanId()))
.findFirst().get();
assertEquals(SpanKind.PRODUCER, producer.getKind());
assertEquals(4, producer.getAttributes().size());
assertEquals("kafka", producer.getAttributes().get(MESSAGING_SYSTEM));
assertEquals("topic", producer.getAttributes().get(MESSAGING_DESTINATION_KIND));
assertEquals(resultTopic, producer.getAttributes().get(MESSAGING_DESTINATION_NAME));
assertEquals(resultTopic + " send", producer.getName());
assertEquals("kafka-producer-kafka", producer.getAttributes().get(MESSAGING_KAFKA_CLIENT_ID));
assertEquals(0, producer.getAttributes().get(MESSAGING_KAFKA_MESSAGE_OFFSET));
assertEquals(resultTopic + " publish", producer.getName());
});
}

Expand Down Expand Up @@ -280,9 +295,12 @@ public void testFromKafkaToAppWithParentSpan() {

SpanData consumer = spans.stream().filter(spanData -> spanData.getParentSpanId().equals(producer.getSpanId()))
.findFirst().get();
assertEquals(8, consumer.getAttributes().size());
assertEquals("kafka", consumer.getAttributes().get(MESSAGING_SYSTEM));
assertEquals("topic", consumer.getAttributes().get(MESSAGING_DESTINATION_KIND));
assertEquals("receive", consumer.getAttributes().get(MESSAGING_OPERATION));
assertEquals(parentTopic, consumer.getAttributes().get(MESSAGING_DESTINATION_NAME));
assertEquals("kafka-consumer-stuff", consumer.getAttributes().get(MESSAGING_KAFKA_CLIENT_ID));
assertEquals(0, consumer.getAttributes().get(MESSAGING_KAFKA_MESSAGE_OFFSET));
assertEquals(parentTopic + " receive", consumer.getName());
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ public PulsarOutgoingChannel(PulsarClient client, Schema<T> schema, PulsarConnec
.getMessagingAttributesGetter();
InstrumenterBuilder<PulsarTrace, Void> instrumenterBuilder = Instrumenter.builder(GlobalOpenTelemetry.get(),
"io.smallrye.reactive.messaging",
MessagingSpanNameExtractor.create(messagingAttributesGetter, MessageOperation.SEND));
MessagingSpanNameExtractor.create(messagingAttributesGetter, MessageOperation.PUBLISH));

instrumenter = instrumenterBuilder
.addAttributesExtractor(MessagingAttributesExtractor.create(messagingAttributesGetter, MessageOperation.SEND))
.addAttributesExtractor(
MessagingAttributesExtractor.create(messagingAttributesGetter, MessageOperation.PUBLISH))
.addAttributesExtractor(AttributesExtractor)
.buildProducerInstrumenter(PulsarTraceTextMapSetter.INSTANCE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ public String getSystem(PulsarTrace pulsarTrace) {
return "pulsar";
}

@Override
public String getDestinationKind(PulsarTrace pulsarTrace) {
return "topic";
}

@Override
public String getDestination(PulsarTrace pulsarTrace) {
return pulsarTrace.getTopic();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.smallrye.reactive.messaging.pulsar.tracing;

import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION_KIND;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_SYSTEM;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -113,8 +112,7 @@ public void testFromAppToPulsar() throws PulsarClientException {
SpanData span = spans.get(0);
assertEquals(SpanKind.PRODUCER, span.getKind());
assertEquals("pulsar", span.getAttributes().get(MESSAGING_SYSTEM));
assertEquals("topic", span.getAttributes().get(MESSAGING_DESTINATION_KIND));
assertEquals(topic + " send", span.getName());
assertEquals(topic + " publish", span.getName());
});
}

Expand Down Expand Up @@ -150,8 +148,7 @@ public void testFromAppToPulsarWithStructuredCloudEvents() throws PulsarClientEx
SpanData span = spans.get(0);
assertEquals(SpanKind.PRODUCER, span.getKind());
assertEquals("kafka", span.getAttributes().get(MESSAGING_SYSTEM));
assertEquals("topic", span.getAttributes().get(MESSAGING_DESTINATION_KIND));
assertEquals(topic + " send", span.getName());
assertEquals(topic + " publish", span.getName());
});
}

Expand Down Expand Up @@ -186,8 +183,7 @@ public void testFromAppToPulsarWithBinaryCloudEvents() throws PulsarClientExcept
SpanData span = spans.get(0);
assertEquals(SpanKind.PRODUCER, span.getKind());
assertEquals("kafka", span.getAttributes().get(MESSAGING_SYSTEM));
assertEquals("topic", span.getAttributes().get(MESSAGING_DESTINATION_KIND));
assertEquals(topic + " send", span.getName());
assertEquals(topic + " publish", span.getName());
});
}

Expand Down Expand Up @@ -229,15 +225,13 @@ public void testFromPulsarToAppToPulsar() throws PulsarClientException {

SpanData consumer = parentSpans.get(0);
assertEquals(SpanKind.CONSUMER, consumer.getKind());
assertEquals("topic", consumer.getAttributes().get(MESSAGING_DESTINATION_KIND));
assertEquals("persistent://public/default/" + parentTopic + " receive", consumer.getName());

SpanData producer = spans.stream().filter(spanData -> spanData.getParentSpanId().equals(consumer.getSpanId()))
.findFirst().get();
assertEquals(SpanKind.PRODUCER, producer.getKind());
assertEquals("pulsar", producer.getAttributes().get(MESSAGING_SYSTEM));
assertEquals("topic", producer.getAttributes().get(MESSAGING_DESTINATION_KIND));
assertEquals(resultTopic + " send", producer.getName());
assertEquals(resultTopic + " publish", producer.getName());
});
}

Expand Down Expand Up @@ -290,7 +284,6 @@ public void testFromPulsarToAppWithParentSpan() throws PulsarClientException {
SpanData consumer = spans.stream().filter(spanData -> spanData.getParentSpanId().equals(producer.getSpanId()))
.findFirst().get();
assertEquals("pulsar", consumer.getAttributes().get(MESSAGING_SYSTEM));
assertEquals("topic", consumer.getAttributes().get(MESSAGING_DESTINATION_KIND));
assertEquals("persistent://public/default/" + parentTopic + " receive", consumer.getName());
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public static RabbitMQOpenTelemetryInstrumenter createForConnector() {
}

private static RabbitMQOpenTelemetryInstrumenter create(boolean sender) {
MessageOperation messageOperation = sender ? MessageOperation.SEND : MessageOperation.RECEIVE;
MessageOperation messageOperation = sender ? MessageOperation.PUBLISH : MessageOperation.RECEIVE;

RabbitMQTraceAttributesExtractor rabbitMQAttributesExtractor = new RabbitMQTraceAttributesExtractor();
MessagingAttributesGetter<RabbitMQTrace, Void> messagingAttributesGetter = rabbitMQAttributesExtractor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ public String getSystem(final RabbitMQTrace rabbitMQTrace) {
return "rabbitmq";
}

@Override
public String getDestinationKind(final RabbitMQTrace rabbitMQTrace) {
return rabbitMQTrace.getDestinationKind();
}

@Override
public String getDestination(final RabbitMQTrace rabbitMQTrace) {
return rabbitMQTrace.getDestination();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package io.smallrye.reactive.messaging.rabbitmq;

import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION_KIND;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION_NAME;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_OPERATION;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_PROTOCOL;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_PROTOCOL_VERSION;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_SYSTEM;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -101,10 +102,11 @@ void incoming() {
SpanData consumer = spans.get(0);
assertEquals(SpanKind.CONSUMER, consumer.getKind());
assertEquals("rabbitmq", consumer.getAttributes().get(MESSAGING_SYSTEM));
assertEquals("receive", consumer.getAttributes().get(MESSAGING_OPERATION));
assertEquals("normal", consumer.getAttributes().get(MESSAGING_RABBITMQ_ROUTING_KEY));
assertEquals(queue, consumer.getAttributes().get(MESSAGING_DESTINATION_NAME));
assertNull(consumer.getAttributes().get(MESSAGING_PROTOCOL));
assertNull(consumer.getAttributes().get(MESSAGING_PROTOCOL_VERSION));
assertEquals("queue", consumer.getAttributes().get(MESSAGING_DESTINATION_KIND));
assertEquals(queue, consumer.getAttributes().get(MESSAGING_DESTINATION_NAME));
assertEquals(queue + " receive", consumer.getName());
});
}
Expand Down

0 comments on commit 2665368

Please sign in to comment.