Skip to content

Commit

Permalink
#2519 Fix: PulsarOutgoingChannel tracing properties propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
michalcukierman committed Mar 13, 2024
1 parent 2de14d4 commit 756a3e8
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import static io.smallrye.reactive.messaging.pulsar.i18n.PulsarLogging.log;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -126,12 +125,15 @@ private TypedMessageBuilder<T> toMessageBuilder(Message<?> message, Producer<T>
final TypedMessageBuilder<T> messageBuilder;
if (optionalMetadata.isPresent()) {
PulsarOutgoingMessageMetadata metadata = optionalMetadata.get();
Map<String, String> properties = metadata.getProperties();
if (tracingEnabled) {
TracingUtils.traceOutgoing(instrumenter, message, new PulsarTrace.Builder()
.withProperties(metadata.getProperties())
PulsarTrace trace = new PulsarTrace.Builder()
.withProperties(properties)
.withSequenceId(metadata.getSequenceId())
.withTopic(producer.getTopic())
.build());
.build();
properties = trace.getProperties();
TracingUtils.traceOutgoing(instrumenter, message, trace);
}
messageBuilder = createMessageBuilder(message, metadata.getTransaction());

Expand All @@ -146,7 +148,7 @@ private TypedMessageBuilder<T> toMessageBuilder(Message<?> message, Producer<T>
messageBuilder.orderingKey(metadata.getOrderingKey());
}
if (metadata.getProperties() != null) {
messageBuilder.properties(metadata.getProperties());
messageBuilder.properties(properties);
}
if (metadata.getReplicatedClusters() != null) {
messageBuilder.replicationClusters(metadata.getReplicatedClusters());
Expand All @@ -166,12 +168,11 @@ private TypedMessageBuilder<T> toMessageBuilder(Message<?> message, Producer<T>
} else {
messageBuilder = createMessageBuilder(message, null);
if (tracingEnabled) {
Map<String, String> properties = new HashMap<>();
TracingUtils.traceOutgoing(instrumenter, message, new PulsarTrace.Builder()
.withProperties(properties)
PulsarTrace trace = new PulsarTrace.Builder()
.withTopic(producer.getTopic())
.build());
messageBuilder.properties(properties);
.build();
TracingUtils.traceOutgoing(instrumenter, message, trace);
messageBuilder.properties(trace.getProperties());
}
}
Object payload = message.getPayload();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,12 @@ public void testFromAppToPulsarWithBinaryCloudEvents() throws PulsarClientExcept
public void testFromPulsarToAppToPulsar() throws PulsarClientException {
String resultTopic = topic + "-result";
String parentTopic = topic + "-parent";
List<Integer> values = new ArrayList<>();
List<org.apache.pulsar.client.api.Message> messages = new ArrayList<>();
receive(client.newConsumer(Schema.INT32)
.topic(resultTopic)
.subscriptionName(topic + "-consumer")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe(), 10, i -> values.add(i.getValue()));
.subscribe(), 10, messages::add);

runApplication(getConfigForMyAppProcessingData(resultTopic, parentTopic), MyAppProcessingData.class);

Expand All @@ -205,8 +205,13 @@ public void testFromPulsarToAppToPulsar() throws PulsarClientException {
.topic(parentTopic)
.create(), 10, (i, p) -> p.newMessage().key("a-key").value(i));

await().until(() -> values.size() == 10);
assertThat(values).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
await().until(() -> messages.size() == 10);
assertThat(messages)
.extracting(org.apache.pulsar.client.api.Message::getValue)
.containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
assertThat(messages)
.extracting(org.apache.pulsar.client.api.Message::getProperties)
.allSatisfy(m -> assertThat(m).containsKey("traceparent"));

CompletableResultCode completableResultCode = tracerProvider.forceFlush();
completableResultCode.whenComplete(() -> {
Expand Down

0 comments on commit 756a3e8

Please sign in to comment.