From 756a3e8bdd49f29f8500db0fe2e950da3236938f Mon Sep 17 00:00:00 2001 From: Michal Cukierman Date: Sat, 9 Mar 2024 13:36:16 +0100 Subject: [PATCH 1/2] #2519 Fix: PulsarOutgoingChannel tracing properties propagation --- .../pulsar/PulsarOutgoingChannel.java | 21 ++++++++++--------- .../tracing/TracingPropagationTest.java | 13 ++++++++---- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarOutgoingChannel.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarOutgoingChannel.java index 6ea7643fd5..4d049ce597 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarOutgoingChannel.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarOutgoingChannel.java @@ -3,7 +3,6 @@ import static io.smallrye.reactive.messaging.pulsar.i18n.PulsarLogging.log; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -126,12 +125,15 @@ private TypedMessageBuilder toMessageBuilder(Message message, Producer final TypedMessageBuilder messageBuilder; if (optionalMetadata.isPresent()) { PulsarOutgoingMessageMetadata metadata = optionalMetadata.get(); + Map properties = metadata.getProperties(); if (tracingEnabled) { - TracingUtils.traceOutgoing(instrumenter, message, new PulsarTrace.Builder() - .withProperties(metadata.getProperties()) + PulsarTrace trace = new PulsarTrace.Builder() + .withProperties(properties) .withSequenceId(metadata.getSequenceId()) .withTopic(producer.getTopic()) - .build()); + .build(); + properties = trace.getProperties(); + TracingUtils.traceOutgoing(instrumenter, message, trace); } messageBuilder = createMessageBuilder(message, metadata.getTransaction()); @@ -146,7 +148,7 @@ private TypedMessageBuilder toMessageBuilder(Message message, Producer messageBuilder.orderingKey(metadata.getOrderingKey()); } if (metadata.getProperties() != null) { - messageBuilder.properties(metadata.getProperties()); + messageBuilder.properties(properties); } if (metadata.getReplicatedClusters() != null) { messageBuilder.replicationClusters(metadata.getReplicatedClusters()); @@ -166,12 +168,11 @@ private TypedMessageBuilder toMessageBuilder(Message message, Producer } else { messageBuilder = createMessageBuilder(message, null); if (tracingEnabled) { - Map properties = new HashMap<>(); - TracingUtils.traceOutgoing(instrumenter, message, new PulsarTrace.Builder() - .withProperties(properties) + PulsarTrace trace = new PulsarTrace.Builder() .withTopic(producer.getTopic()) - .build()); - messageBuilder.properties(properties); + .build(); + TracingUtils.traceOutgoing(instrumenter, message, trace); + messageBuilder.properties(trace.getProperties()); } } Object payload = message.getPayload(); diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/tracing/TracingPropagationTest.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/tracing/TracingPropagationTest.java index cf19c29b9c..bd520747a6 100644 --- a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/tracing/TracingPropagationTest.java +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/tracing/TracingPropagationTest.java @@ -191,12 +191,12 @@ public void testFromAppToPulsarWithBinaryCloudEvents() throws PulsarClientExcept public void testFromPulsarToAppToPulsar() throws PulsarClientException { String resultTopic = topic + "-result"; String parentTopic = topic + "-parent"; - List values = new ArrayList<>(); + List messages = new ArrayList<>(); receive(client.newConsumer(Schema.INT32) .topic(resultTopic) .subscriptionName(topic + "-consumer") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .subscribe(), 10, i -> values.add(i.getValue())); + .subscribe(), 10, messages::add); runApplication(getConfigForMyAppProcessingData(resultTopic, parentTopic), MyAppProcessingData.class); @@ -205,8 +205,13 @@ public void testFromPulsarToAppToPulsar() throws PulsarClientException { .topic(parentTopic) .create(), 10, (i, p) -> p.newMessage().key("a-key").value(i)); - await().until(() -> values.size() == 10); - assertThat(values).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + await().until(() -> messages.size() == 10); + assertThat(messages) + .extracting(org.apache.pulsar.client.api.Message::getValue) + .containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + assertThat(messages) + .extracting(org.apache.pulsar.client.api.Message::getProperties) + .allSatisfy(m -> assertThat(m).containsKey("traceparent")); CompletableResultCode completableResultCode = tracerProvider.forceFlush(); completableResultCode.whenComplete(() -> { From f694fab40d37ce77b916c72fdc9d6a34c8afd14a Mon Sep 17 00:00:00 2001 From: Michal Cukierman Date: Sat, 9 Mar 2024 13:36:16 +0100 Subject: [PATCH 2/2] #2519 Fix: PulsarOutgoingChannel tracing properties propagation --- .../tracing/TracingPropagationTest.java | 28 +++++++++++++++++-- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/tracing/TracingPropagationTest.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/tracing/TracingPropagationTest.java index bd520747a6..ed28666330 100644 --- a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/tracing/TracingPropagationTest.java +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/tracing/TracingPropagationTest.java @@ -50,6 +50,7 @@ import io.smallrye.mutiny.Multi; import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata; import io.smallrye.reactive.messaging.pulsar.PulsarConnector; +import io.smallrye.reactive.messaging.pulsar.PulsarOutgoingMessageMetadata; import io.smallrye.reactive.messaging.pulsar.base.WeldTestBase; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; import io.vertx.core.json.Json; @@ -82,9 +83,18 @@ static void shutdown() { GlobalOpenTelemetry.resetForTest(); } - @SuppressWarnings("ConstantConditions") @Test - public void testFromAppToPulsar() throws PulsarClientException { + public void testFromAppGeneratingDataToPulsar() throws PulsarClientException { + testFromAppToPulsar(MyAppGeneratingData.class); + } + + @Test + public void testFromAppGeneratingPulsarDataToPulsar() throws PulsarClientException { + testFromAppToPulsar(MyAppGeneratingPulsarData.class); + } + + @SuppressWarnings("ConstantConditions") + public void testFromAppToPulsar(Class applicationClass) throws PulsarClientException { List> messages = new ArrayList<>(); receive(client.newConsumer(Schema.INT32) .topic(topic) @@ -92,7 +102,7 @@ public void testFromAppToPulsar() throws PulsarClientException { .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(), 10, messages::add); - runApplication(getConfigForMyAppGeneratingData(), MyAppGeneratingData.class); + runApplication(getConfigForMyAppGeneratingData(), applicationClass); await().until(() -> messages.size() >= 10); List values = new ArrayList<>(); @@ -379,6 +389,18 @@ public Flow.Publisher> source() { } } + @ApplicationScoped + public static class MyAppGeneratingPulsarData { + @Outgoing("pulsar") + public Flow.Publisher> source() { + return Multi.createFrom().range(0, 10) + .map(Message::of) + .map(m -> m.addMetadata(PulsarOutgoingMessageMetadata.builder() + .withEventTime(1) + .build())); + } + } + @ApplicationScoped public static class MyAppProcessingData { @Incoming("source")