Skip to content

Commit

Permalink
Merge pull request #2520 from streamx-dev/main
Browse files Browse the repository at this point in the history
#2519 Fix: PulsarOutgoingChannel tracing properties propagation
  • Loading branch information
ozangunalp authored Mar 13, 2024
2 parents e40ef80 + f694fab commit 00ac1b3
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import static io.smallrye.reactive.messaging.pulsar.i18n.PulsarLogging.log;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -126,12 +125,15 @@ private TypedMessageBuilder<T> toMessageBuilder(Message<?> message, Producer<T>
final TypedMessageBuilder<T> messageBuilder;
if (optionalMetadata.isPresent()) {
PulsarOutgoingMessageMetadata metadata = optionalMetadata.get();
Map<String, String> properties = metadata.getProperties();
if (tracingEnabled) {
TracingUtils.traceOutgoing(instrumenter, message, new PulsarTrace.Builder()
.withProperties(metadata.getProperties())
PulsarTrace trace = new PulsarTrace.Builder()
.withProperties(properties)
.withSequenceId(metadata.getSequenceId())
.withTopic(producer.getTopic())
.build());
.build();
properties = trace.getProperties();
TracingUtils.traceOutgoing(instrumenter, message, trace);
}
messageBuilder = createMessageBuilder(message, metadata.getTransaction());

Expand All @@ -146,7 +148,7 @@ private TypedMessageBuilder<T> toMessageBuilder(Message<?> message, Producer<T>
messageBuilder.orderingKey(metadata.getOrderingKey());
}
if (metadata.getProperties() != null) {
messageBuilder.properties(metadata.getProperties());
messageBuilder.properties(properties);
}
if (metadata.getReplicatedClusters() != null) {
messageBuilder.replicationClusters(metadata.getReplicatedClusters());
Expand All @@ -166,12 +168,11 @@ private TypedMessageBuilder<T> toMessageBuilder(Message<?> message, Producer<T>
} else {
messageBuilder = createMessageBuilder(message, null);
if (tracingEnabled) {
Map<String, String> properties = new HashMap<>();
TracingUtils.traceOutgoing(instrumenter, message, new PulsarTrace.Builder()
.withProperties(properties)
PulsarTrace trace = new PulsarTrace.Builder()
.withTopic(producer.getTopic())
.build());
messageBuilder.properties(properties);
.build();
TracingUtils.traceOutgoing(instrumenter, message, trace);
messageBuilder.properties(trace.getProperties());
}
}
Object payload = message.getPayload();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import io.smallrye.reactive.messaging.pulsar.PulsarConnector;
import io.smallrye.reactive.messaging.pulsar.PulsarOutgoingMessageMetadata;
import io.smallrye.reactive.messaging.pulsar.base.WeldTestBase;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
import io.vertx.core.json.Json;
Expand Down Expand Up @@ -82,17 +83,26 @@ static void shutdown() {
GlobalOpenTelemetry.resetForTest();
}

@SuppressWarnings("ConstantConditions")
@Test
public void testFromAppToPulsar() throws PulsarClientException {
public void testFromAppGeneratingDataToPulsar() throws PulsarClientException {
testFromAppToPulsar(MyAppGeneratingData.class);
}

@Test
public void testFromAppGeneratingPulsarDataToPulsar() throws PulsarClientException {
testFromAppToPulsar(MyAppGeneratingPulsarData.class);
}

@SuppressWarnings("ConstantConditions")
public void testFromAppToPulsar(Class<?> applicationClass) throws PulsarClientException {
List<org.apache.pulsar.client.api.Message<Integer>> messages = new ArrayList<>();
receive(client.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName(topic + "-consumer")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe(), 10, messages::add);

runApplication(getConfigForMyAppGeneratingData(), MyAppGeneratingData.class);
runApplication(getConfigForMyAppGeneratingData(), applicationClass);

await().until(() -> messages.size() >= 10);
List<Integer> values = new ArrayList<>();
Expand Down Expand Up @@ -191,12 +201,12 @@ public void testFromAppToPulsarWithBinaryCloudEvents() throws PulsarClientExcept
public void testFromPulsarToAppToPulsar() throws PulsarClientException {
String resultTopic = topic + "-result";
String parentTopic = topic + "-parent";
List<Integer> values = new ArrayList<>();
List<org.apache.pulsar.client.api.Message> messages = new ArrayList<>();
receive(client.newConsumer(Schema.INT32)
.topic(resultTopic)
.subscriptionName(topic + "-consumer")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe(), 10, i -> values.add(i.getValue()));
.subscribe(), 10, messages::add);

runApplication(getConfigForMyAppProcessingData(resultTopic, parentTopic), MyAppProcessingData.class);

Expand All @@ -205,8 +215,13 @@ public void testFromPulsarToAppToPulsar() throws PulsarClientException {
.topic(parentTopic)
.create(), 10, (i, p) -> p.newMessage().key("a-key").value(i));

await().until(() -> values.size() == 10);
assertThat(values).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
await().until(() -> messages.size() == 10);
assertThat(messages)
.extracting(org.apache.pulsar.client.api.Message::getValue)
.containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
assertThat(messages)
.extracting(org.apache.pulsar.client.api.Message::getProperties)
.allSatisfy(m -> assertThat(m).containsKey("traceparent"));

CompletableResultCode completableResultCode = tracerProvider.forceFlush();
completableResultCode.whenComplete(() -> {
Expand Down Expand Up @@ -374,6 +389,18 @@ public Flow.Publisher<Message<String>> source() {
}
}

@ApplicationScoped
public static class MyAppGeneratingPulsarData {
@Outgoing("pulsar")
public Flow.Publisher<Message<Integer>> source() {
return Multi.createFrom().range(0, 10)
.map(Message::of)
.map(m -> m.addMetadata(PulsarOutgoingMessageMetadata.builder()
.withEventTime(1)
.build()));
}
}

@ApplicationScoped
public static class MyAppProcessingData {
@Incoming("source")
Expand Down

0 comments on commit 00ac1b3

Please sign in to comment.