Skip to content

Commit

Permalink
Pulsar force schema fetch on pulsar-client-thread for all schema requ…
Browse files Browse the repository at this point in the history
…iring schema info fetching

Fixes #2493
  • Loading branch information
ozangunalp committed Feb 22, 2024
1 parent f810ab8 commit 545a4c9
Showing 1 changed file with 4 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@
import java.util.stream.Collectors;

import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.GlobalOpenTelemetry;
Expand All @@ -25,7 +23,6 @@
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.pulsar.tracing.PulsarAttributesExtractor;
import io.smallrye.reactive.messaging.pulsar.tracing.PulsarTrace;
Expand Down Expand Up @@ -115,8 +112,8 @@ public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema<T> schema,
.until(m -> closed.get())
.plug(msgMulti -> {
// Calling getValue on the pulsar-client-internal thread to make sure the SchemaInfo is fetched
if (schema instanceof AutoConsumeSchema || schema instanceof KeyValueSchema) {
return msgMulti.onItem().call(msg -> Uni.createFrom().item(msg::getValue));
if (schema.requireFetchingSchemaInfo()) {
return msgMulti.onItem().invoke(org.apache.pulsar.client.api.Message::getValue);
} else {
return msgMulti;
}
Expand All @@ -139,11 +136,8 @@ public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema<T> schema,
.filter(m -> m.size() > 0)
.plug(msgMulti -> {
// Calling getValue on the pulsar-client-internal thread to make sure the SchemaInfo is fetched
if (schema instanceof AutoConsumeSchema || schema instanceof KeyValueSchema) {
return msgMulti.onItem().call(msg -> Uni.createFrom().item(() -> {
msg.forEach(m -> m.getValue());
return null;
}));
if (schema.requireFetchingSchemaInfo()) {
return msgMulti.onItem().invoke(msg -> msg.forEach(org.apache.pulsar.client.api.Message::getValue));
} else {
return msgMulti;
}
Expand Down

0 comments on commit 545a4c9

Please sign in to comment.